You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by "merlimat (via GitHub)" <gi...@apache.org> on 2024/03/03 03:07:09 UTC

[PR] WIP: PIP-342: OTel client metrics support [pulsar]

merlimat opened a new pull request, #22179:
URL: https://github.com/apache/pulsar/pull/22179

   
   PIP: #22178 
   
   <!-- Details of when a PIP is required and how the PIP process work, please see: https://github.com/apache/pulsar/blob/master/pip/README.md -->
   
   ### Motivation
   
   <!-- Explain here the context, and why you're making that change. What is the problem you're trying to solve. -->
   
   ### Modifications
   
   <!-- Describe the modifications you've done. -->
   
   ### Verifying this change
   
   - [ ] Make sure that the change passes the CI checks.
   
   *(Please pick either of the following options)*
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This change is already covered by existing tests, such as *(please describe tests)*.
   
   *(or)*
   
   This change added tests and can be verified as follows:
   
   *(example:)*
     - *Added integration tests for end-to-end deployment with large payloads (10MB)*
     - *Extended integration test for recovery after broker failure*
   
   ### Does this pull request potentially affect one of the following parts:
   
   <!-- DO NOT REMOVE THIS SECTION. CHECK THE PROPER BOX ONLY. -->
   
   *If the box was checked, please highlight the changes*
   
   - [ ] Dependencies (add or upgrade a dependency)
   - [ ] The public API
   - [ ] The schema
   - [ ] The default values of configurations
   - [ ] The threading model
   - [ ] The binary protocol
   - [ ] The REST endpoints
   - [ ] The admin CLI options
   - [ ] The metrics
   - [ ] Anything that affects deployment
   
   ### Documentation
   
   <!-- DO NOT REMOVE THIS SECTION. CHECK THE PROPER BOX ONLY. -->
   
   - [ ] `doc` <!-- Your PR contains doc changes. -->
   - [x] `doc-required` <!-- Your PR changes impact docs and you will update later -->
   - [ ] `doc-not-needed` <!-- Your PR changes do not impact docs -->
   - [ ] `doc-complete` <!-- Docs have been already added -->
   
   ### Matching PR in forked repository
   
   PR in forked repository: <!-- ENTER URL HERE -->
   
   <!--
   After opening this PR, the build in apache/pulsar will fail and instructions will
   be provided for opening a PR in the PR author's forked repository.
   
   apache/pulsar pull requests should be first tested in your own fork since the 
   apache/pulsar CI based on GitHub Actions has constrained resources and quota.
   GitHub Actions provides separate quota for pull requests that are executed in 
   a forked repository.
   
   The tests will be run in the forked repository until all PR review comments have
   been handled, the tests pass and the PR is approved by a reviewer.
   -->
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] WIP: PIP-342: OTel client metrics support [pulsar]

Posted by "merlimat (via GitHub)" <gi...@apache.org>.
merlimat commented on code in PR #22179:
URL: https://github.com/apache/pulsar/pull/22179#discussion_r1513265863


##########
pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java:
##########
@@ -451,15 +452,18 @@ ClientBuilder authentication(String authPluginClassName, Map<String, String> aut
     ClientBuilder memoryLimit(long memoryLimit, SizeUnit unit);
 
     /**
-     * Set the interval between each stat info <i>(default: 60 seconds)</i> Stats will be activated with positive
+     * Set the interval between each stat info <i>(default: disabled)</i> Stats will be activated with positive

Review Comment:
   I'd agree if the current metrics were any useful, but unfortunately they are not.. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] WIP: PIP-342: OTel client metrics support [pulsar]

Posted by "merlimat (via GitHub)" <gi...@apache.org>.
merlimat commented on code in PR #22179:
URL: https://github.com/apache/pulsar/pull/22179#discussion_r1511963121


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java:
##########
@@ -395,6 +397,11 @@ public class ClientConfigurationData implements Serializable, Cloneable {
     )
     private String description;
 
+
+    private transient OpenTelemetry openTelemetry;

Review Comment:
   The builder itself has to be serializable. This is to support environments like Storm where the builder is setup on one node and then pushed to all the nodes... 
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [improve] WIP: PIP-342: OTel client metrics support [pulsar]

Posted by "merlimat (via GitHub)" <gi...@apache.org>.
merlimat commented on code in PR #22179:
URL: https://github.com/apache/pulsar/pull/22179#discussion_r1516556578


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java:
##########
@@ -268,15 +283,35 @@ public ProducerImpl(PulsarClientImpl client, String topic, ProducerConfiguration
             metadata = Collections.unmodifiableMap(new HashMap<>(conf.getProperties()));
         }
 
+        InstrumentProvider ip = client.instrumentProvider();
+        latencyHistogram = ip.newLatencyHistogram("pulsar.client.producer.message.send.duration",

Review Comment:
   Yes, correct



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] WIP: PIP-342: OTel client metrics support [pulsar]

Posted by "merlimat (via GitHub)" <gi...@apache.org>.
merlimat commented on code in PR #22179:
URL: https://github.com/apache/pulsar/pull/22179#discussion_r1511508796


##########
pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java:
##########
@@ -554,6 +558,10 @@ ClientBuilder authentication(String authPluginClassName, Map<String, String> aut
      */
     ClientBuilder enableBusyWait(boolean enableBusyWait);
 
+    ClientBuilder openTelemetry(io.opentelemetry.api.OpenTelemetry openTelemetry);

Review Comment:
   Good point. Fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [improve] WIP: PIP-342: OTel client metrics support [pulsar]

Posted by "merlimat (via GitHub)" <gi...@apache.org>.
merlimat commented on code in PR #22179:
URL: https://github.com/apache/pulsar/pull/22179#discussion_r1516555109


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java:
##########
@@ -386,7 +402,29 @@ protected ConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurat
 
         topicNameWithoutPartition = topicName.getPartitionedTopicName();
 
+        InstrumentProvider ip = client.instrumentProvider();
+        consumersOpenedCounter = ip.newCounter("pulsar.client.consumer.opened", Unit.Sessions,
+                "Counter of sessions opened", topic, Attributes.empty());
+        consumersClosedCounter = ip.newCounter("pulsar.client.consumer.closed", Unit.Sessions,
+                "Counter of sessions closed", topic, Attributes.empty());
+        messagesReceivedCounter = ip.newCounter("pulsar.client.received.count", Unit.Messages,

Review Comment:
   👍 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] WIP: PIP-342: OTel client metrics support [pulsar]

Posted by "merlimat (via GitHub)" <gi...@apache.org>.
merlimat commented on code in PR #22179:
URL: https://github.com/apache/pulsar/pull/22179#discussion_r1513085236


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java:
##########
@@ -386,7 +402,30 @@ protected ConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurat
 
         topicNameWithoutPartition = topicName.getPartitionedTopicName();
 
+        InstrumentProvider ip = client.instrumentProvider();
+        Attributes attrs = ip.getAttributes(topic);
+        consumersOpenedCounter = ip.newCounter("pulsar.client.session.opened", Unit.Sessions,

Review Comment:
   (I've already changed it to use separate metrics)
   
   A "consumer" object in the client SDK can be associated with several sessions on the broker side, one at a time. eg: after a topic is moved, the client will reconnect to a different broker and create a new "session".



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [improve] WIP: PIP-342: OTel client metrics support [pulsar]

Posted by "dao-jun (via GitHub)" <gi...@apache.org>.
dao-jun commented on code in PR #22179:
URL: https://github.com/apache/pulsar/pull/22179#discussion_r1520301299


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java:
##########
@@ -386,7 +402,29 @@ protected ConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurat
 
         topicNameWithoutPartition = topicName.getPartitionedTopicName();
 
+        InstrumentProvider ip = client.instrumentProvider();
+        consumersOpenedCounter = ip.newCounter("pulsar.client.consumer.opened", Unit.Sessions,
+                "Counter of sessions opened", topic, Attributes.empty());

Review Comment:
   I mean for consumer metrics, should we expose them in subscription level?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] WIP: PIP-342: OTel client metrics support [pulsar]

Posted by "merlimat (via GitHub)" <gi...@apache.org>.
merlimat commented on code in PR #22179:
URL: https://github.com/apache/pulsar/pull/22179#discussion_r1511850762


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java:
##########
@@ -386,7 +402,30 @@ protected ConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurat
 
         topicNameWithoutPartition = topicName.getPartitionedTopicName();
 
+        InstrumentProvider ip = client.instrumentProvider();
+        Attributes attrs = ip.getAttributes(topic);
+        consumersOpenedCounter = ip.newCounter("pulsar.client.session.opened", Unit.Sessions,
+                "Counter of sessions opened", attrs.toBuilder().put("type", "consumer").build());
+        consumersClosedCounter = ip.newCounter("pulsar.client.session.closed", Unit.Sessions,
+                "Counter of sessions closed", attrs.toBuilder().put("type", "consumer").build());
+        messagesReceivedCounter = ip.newCounter("pulsar.client.received", Unit.Messages,
+                "Number of messages received", attrs);
+        bytesReceivedCounter = ip.newCounter("pulsar.client.received", Unit.Bytes,

Review Comment:
   eg: 
   
   ```
   # HELP pulsar_client_received_total Number of messages received
   # TYPE pulsar_client_received_total counter
   pulsar_client_received_total{otel_scope_name="org.apache.pulsar.client",otel_scope_version="3.3.0-SNAPSHOT",pulsar_namespace="public/default",pulsar_tenant="public",pulsar_topic="persistent://public/default/pt"} 122.0
   # HELP pulsar_client_received_bytes_total Bytes received
   # TYPE pulsar_client_received_bytes_total counter
   pulsar_client_received_bytes_total{otel_scope_name="org.apache.pulsar.client",otel_scope_version="3.3.0-SNAPSHOT",pulsar_namespace="public/default",pulsar_tenant="public",pulsar_topic="persistent://public/default/pt"} 124928.0
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] WIP: PIP-342: OTel client metrics support [pulsar]

Posted by "merlimat (via GitHub)" <gi...@apache.org>.
merlimat commented on code in PR #22179:
URL: https://github.com/apache/pulsar/pull/22179#discussion_r1511764443


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java:
##########
@@ -84,6 +92,21 @@ public BinaryProtoLookupService(PulsarClientImpl client,
         this.serviceNameResolver = new PulsarServiceNameResolver();
         this.listenerName = listenerName;
         updateServiceUrl(serviceUrl);
+
+        Attributes attrs = Attributes.of(AttributeKey.stringKey("transport-type"), "binary");
+
+        histoGetBroker = client.instrumentProvider().newLatencyHistogram("pulsar.client.lookup",
+                "Lookup operations",
+                attrs.toBuilder().put("type", "topic").build());
+        histoGetTopicMetadata = client.instrumentProvider().newLatencyHistogram("pulsar.client.lookup",
+                "Lookup operations",

Review Comment:
   The 4 commands are all different types of lookup. That's why I put them together. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] WIP: PIP-342: OTel client metrics support [pulsar]

Posted by "merlimat (via GitHub)" <gi...@apache.org>.
merlimat commented on code in PR #22179:
URL: https://github.com/apache/pulsar/pull/22179#discussion_r1513094996


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java:
##########
@@ -84,6 +92,21 @@ public BinaryProtoLookupService(PulsarClientImpl client,
         this.serviceNameResolver = new PulsarServiceNameResolver();
         this.listenerName = listenerName;
         updateServiceUrl(serviceUrl);
+
+        Attributes attrs = Attributes.of(AttributeKey.stringKey("transport-type"), "binary");
+
+        histoGetBroker = client.instrumentProvider().newLatencyHistogram("pulsar.client.lookup",
+                "Lookup operations",
+                attrs.toBuilder().put("type", "topic").build());
+        histoGetTopicMetadata = client.instrumentProvider().newLatencyHistogram("pulsar.client.lookup",
+                "Lookup operations",

Review Comment:
   Yes, it makes sense to have them aggregated together. Later we might add other types of lookup and we shouldn't need to update all the dashboards each time.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] WIP: PIP-342: OTel client metrics support [pulsar]

Posted by "asafm (via GitHub)" <gi...@apache.org>.
asafm commented on code in PR #22179:
URL: https://github.com/apache/pulsar/pull/22179#discussion_r1512321989


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java:
##########
@@ -84,6 +92,21 @@ public BinaryProtoLookupService(PulsarClientImpl client,
         this.serviceNameResolver = new PulsarServiceNameResolver();
         this.listenerName = listenerName;
         updateServiceUrl(serviceUrl);
+
+        Attributes attrs = Attributes.of(AttributeKey.stringKey("transport-type"), "binary");
+
+        histoGetBroker = client.instrumentProvider().newLatencyHistogram("pulsar.client.lookup",
+                "Lookup operations",
+                attrs.toBuilder().put("type", "topic").build());
+        histoGetTopicMetadata = client.instrumentProvider().newLatencyHistogram("pulsar.client.lookup",
+                "Lookup operations",

Review Comment:
   Does it make sense, in a way, to aggregate across all four types (the entire instrument)?
   That's the rule of thumb for using attributes of instrument names.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] WIP: PIP-342: OTel client metrics support [pulsar]

Posted by "merlimat (via GitHub)" <gi...@apache.org>.
merlimat commented on code in PR #22179:
URL: https://github.com/apache/pulsar/pull/22179#discussion_r1511757658


##########
pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java:
##########
@@ -554,6 +558,10 @@ ClientBuilder authentication(String authPluginClassName, Map<String, String> aut
      */
     ClientBuilder enableBusyWait(boolean enableBusyWait);
 
+    ClientBuilder openTelemetry(io.opentelemetry.api.OpenTelemetry openTelemetry);
+
+    ClientBuilder openTelemetryMetricsCardinality(MetricsCardinality metricsCardinality);

Review Comment:
   OK, I just saw that we have to do a downcast to it.. I'll think on how to refactor this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [improve] WIP: PIP-342: OTel client metrics support [pulsar]

Posted by "merlimat (via GitHub)" <gi...@apache.org>.
merlimat commented on code in PR #22179:
URL: https://github.com/apache/pulsar/pull/22179#discussion_r1517940520


##########
pulsar-broker/src/test/java/org/apache/pulsar/client/metrics/ClientMetricsTest.java:
##########
@@ -0,0 +1,333 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.metrics;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+import io.opentelemetry.api.OpenTelemetry;
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.sdk.OpenTelemetrySdk;
+import io.opentelemetry.sdk.metrics.SdkMeterProvider;
+import io.opentelemetry.sdk.metrics.data.MetricData;
+import io.opentelemetry.sdk.metrics.data.MetricDataType;
+import io.opentelemetry.sdk.testing.exporter.InMemoryMetricReader;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.TimeUnit;
+import lombok.Cleanup;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+@Test(groups = "broker-api")
+public class ClientMetricsTest extends ProducerConsumerBase {
+
+    InMemoryMetricReader reader;
+    OpenTelemetry otel;
+
+    @BeforeMethod
+    @Override
+    protected void setup() throws Exception {
+        super.internalSetup();
+        super.producerBaseSetup();
+
+        this.reader = InMemoryMetricReader.create();
+        SdkMeterProvider sdkMeterProvider = SdkMeterProvider.builder()
+                .registerMetricReader(reader)
+                .build();
+        this.otel = OpenTelemetrySdk.builder().setMeterProvider(sdkMeterProvider).build();
+    }
+
+    @AfterMethod(alwaysRun = true)
+    @Override
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    private Map<String, MetricData> collectMetrics() {
+        Map<String, MetricData> metrics = new TreeMap<>();
+        for (MetricData md : reader.collectAllMetrics()) {
+            metrics.put(md.getName(), md);
+        }
+        return metrics;
+    }
+
+    private void assertCounterValue(Map<String, MetricData> metrics, String name, long expectedValue,
+                                    Attributes expectedAttributes) {
+        assertEquals(getCounterValue(metrics, name, expectedAttributes), expectedValue);
+    }
+
+    private long getCounterValue(Map<String, MetricData> metrics, String name,
+                                    Attributes expectedAttributes) {
+        MetricData md = metrics.get(name);
+        assertNotNull(md, "metric not found: " + name);
+        assertEquals(md.getType(), MetricDataType.LONG_SUM);
+
+        for (var ex : md.getLongSumData().getPoints()) {
+            if (ex.getAttributes().equals(expectedAttributes)) {
+                return ex.getValue();
+            }
+        }
+
+        fail("metric attributes not found: " + expectedAttributes);
+        return -1;
+    }
+
+    private void assertHistoCountValue(Map<String, MetricData> metrics, String name, long expectedCount,
+                                       Attributes expectedAttributes) {
+        assertEquals(getHistoCountValue(metrics, name, expectedAttributes), expectedCount);
+    }
+
+    private long getHistoCountValue(Map<String, MetricData> metrics, String name,
+                                    Attributes expectedAttributes) {
+        MetricData md = metrics.get(name);
+        assertNotNull(md, "metric not found: " + name);
+        assertEquals(md.getType(), MetricDataType.HISTOGRAM);
+
+        for (var ex : md.getHistogramData().getPoints()) {
+            if (ex.getAttributes().equals(expectedAttributes)) {
+                return ex.getCount();
+            }
+        }
+
+        fail("metric attributes not found: " + expectedAttributes);
+        return -1;
+    }
+
+    @Test
+    public void testProducerMetrics() throws Exception {
+        String topic = newTopicName();
+
+        PulsarClient client = PulsarClient.builder()
+                .serviceUrl(pulsar.getBrokerServiceUrl())
+                .openTelemetry(otel)
+                .build();
+
+        Producer<String> producer = client.newProducer(Schema.STRING)
+                .topic(topic)
+                .create();
+
+        for (int i = 0; i < 5; i++) {
+            producer.send("Hello");
+        }
+
+        Attributes nsAttrs = Attributes.builder()
+                .put("pulsar.tenant", "my-property")
+                .put("pulsar.namespace", "my-property/my-ns")
+                .build();
+        Attributes nsAttrsSuccess = nsAttrs.toBuilder()
+                .put("success", true)
+                .build();
+
+        var metrics = collectMetrics();
+        System.err.println("All metrics: " + metrics.keySet());
+
+        assertCounterValue(metrics, "pulsar.client.connections.opened", 1, Attributes.empty());
+        assertCounterValue(metrics, "pulsar.client.producer.message.pending.count", 0, nsAttrs);
+        assertCounterValue(metrics, "pulsar.client.producer.message.pending.size", 0, nsAttrs);
+
+        assertHistoCountValue(metrics, "pulsar.client.lookup.duration", 1,
+                Attributes.builder()
+                        .put("pulsar.lookup.transport-type", "binary")
+                        .put("pulsar.lookup.type", "topic")
+                        .put("success", true)
+                        .build());
+        assertHistoCountValue(metrics, "pulsar.client.lookup.duration", 1,
+                Attributes.builder()
+                        .put("pulsar.lookup.transport-type", "binary")
+                        .put("pulsar.lookup.type", "metadata")
+                        .put("success", true)
+                        .build());
+
+        assertHistoCountValue(metrics, "pulsar.client.producer.message.send.duration", 5, nsAttrsSuccess);
+        assertHistoCountValue(metrics, "pulsar.client.producer.rpc.send.duration", 5, nsAttrsSuccess);
+        assertCounterValue(metrics, "pulsar.client.producer.message.send.size", "hello".length() * 5, nsAttrs);
+
+
+        assertCounterValue(metrics, "pulsar.client.producer.opened", 1, nsAttrs);
+
+        producer.close();
+        client.close();
+
+        metrics = collectMetrics();
+        assertCounterValue(metrics, "pulsar.client.producer.closed", 1, nsAttrs);
+        assertCounterValue(metrics, "pulsar.client.connections.closed", 1, Attributes.empty());
+    }
+
+    @Test
+    public void testConnectionsFailedMetrics() throws Exception {
+        String topic = newTopicName();
+
+        @Cleanup
+        PulsarClient client = PulsarClient.builder()
+                .serviceUrl("pulsar://invalid-pulsar-address:1234")
+                .operationTimeout(3, TimeUnit.SECONDS)
+                .openTelemetry(otel)
+                .build();
+
+        try {
+            client.newProducer(Schema.STRING)
+                    .topic(topic)
+                    .create();
+            fail("Should have failed the producer creation");
+        } catch (Exception e) {
+            // Expected
+        }
+
+        var metrics = collectMetrics();
+
+        assertTrue(getCounterValue(metrics, "pulsar.client.connections.failed",
+                Attributes.builder().put("pulsar.failure.type", "tcp-failed").build()) >= 1);
+    }
+
+    @Test
+    public void testPublishFailedMetrics() throws Exception {
+        String topic = newTopicName();
+
+        @Cleanup
+        PulsarClient client = PulsarClient.builder()
+                .serviceUrl(admin.getServiceUrl())
+                .operationTimeout(3, TimeUnit.SECONDS)
+                .openTelemetry(otel)
+                .build();
+
+        @Cleanup
+        Producer<String> producer = client.newProducer(Schema.STRING)
+                .topic(topic)
+                .sendTimeout(3, TimeUnit.SECONDS)
+                .create();
+
+        // Make the client switch to non-existing broker to make publish fail
+        client.updateServiceUrl("pulsar://invalid-address:6650");
+
+
+        try {
+            producer.send("Hello");
+            fail("Should have failed to publish");
+        } catch (Exception e) {
+            // expected
+        }
+
+        var metrics = collectMetrics();
+
+        Attributes nsAttrs = Attributes.builder()
+                .put("pulsar.tenant", "my-property")
+                .put("pulsar.namespace", "my-property/my-ns")
+                .build();
+        Attributes nsAttrsFailure = nsAttrs.toBuilder()
+                .put("success", false)
+                .build();
+
+        assertCounterValue(metrics, "pulsar.client.producer.message.pending.count", 0, nsAttrs);
+        assertCounterValue(metrics, "pulsar.client.producer.message.pending.size", 0, nsAttrs);
+        assertHistoCountValue(metrics, "pulsar.client.producer.message.send.duration", 1, nsAttrsFailure);
+        assertHistoCountValue(metrics, "pulsar.client.producer.rpc.send.duration", 1, nsAttrsFailure);
+    }
+
+    @Test
+    public void testConsumerMetrics() throws Exception {
+        String topic = newTopicName();
+
+        PulsarClient client = PulsarClient.builder()
+                .serviceUrl(pulsar.getBrokerServiceUrl())
+                .openTelemetry(otel)
+                .build();
+
+        @Cleanup
+        Producer<String> producer = client.newProducer(Schema.STRING)
+                .topic(topic)
+                .create();
+
+        Consumer<String> consumer = client.newConsumer(Schema.STRING)
+                .topic(topic)
+                .subscriptionName("my-sub")
+                .ackTimeout(1, TimeUnit.SECONDS)
+                .subscriptionType(SubscriptionType.Shared)
+                .subscribe();
+
+        for (int i = 0; i < 10; i++) {
+            producer.send("Hello");
+        }
+
+        Thread.sleep(1000);
+
+        Attributes nsAttrs = Attributes.builder()
+                .put("pulsar.tenant", "my-property")
+                .put("pulsar.namespace", "my-property/my-ns")
+                .build();
+        var metrics = collectMetrics();
+
+        assertCounterValue(metrics, "pulsar.client.connections.opened", 1, Attributes.empty());
+
+        assertHistoCountValue(metrics, "pulsar.client.lookup.duration", 2,
+                Attributes.builder()
+                        .put("pulsar.lookup.transport-type", "binary")
+                        .put("pulsar.lookup.type", "topic")
+                        .put("success", true)
+                        .build());
+        assertHistoCountValue(metrics, "pulsar.client.lookup.duration", 2,
+                Attributes.builder()
+                        .put("pulsar.lookup.transport-type", "binary")
+                        .put("pulsar.lookup.type", "metadata")
+                        .put("success", true)
+                        .build());
+
+        assertCounterValue(metrics, "pulsar.client.consumer.prefetched.count", 10, nsAttrs);
+        assertCounterValue(metrics, "pulsar.client.consumer.prefetched.size", "hello".length() * 10, nsAttrs);
+        assertCounterValue(metrics, "pulsar.client.consumer.opened", 1, nsAttrs);
+
+        Message<String> msg1 = consumer.receive();
+        consumer.acknowledge(msg1);
+
+        Message<String> msg2 = consumer.receive();
+        consumer.negativeAcknowledge(msg2);
+
+        /* Message<String> msg3 = */ consumer.receive();
+
+        metrics = collectMetrics();
+        assertCounterValue(metrics, "pulsar.client.consumer.prefetched.count", 7, nsAttrs);
+        assertCounterValue(metrics, "pulsar.client.consumer.prefetched.size", "hello".length() * 7, nsAttrs);
+
+        // Let msg3 to reach ack-timeout
+        Thread.sleep(3000);
+
+        metrics = collectMetrics();
+        assertCounterValue(metrics, "pulsar.client.consumer.prefetched.count", 8, nsAttrs);
+        assertCounterValue(metrics, "pulsar.client.consumer.prefetched.size", "hello".length() * 8, nsAttrs);
+
+        assertCounterValue(metrics, "pulsar.client.consumer.message.ack", 1, nsAttrs);

Review Comment:
   Yes, the metric is there, I forgot to check it in the test



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [improve] WIP: PIP-342: OTel client metrics support [pulsar]

Posted by "merlimat (via GitHub)" <gi...@apache.org>.
merlimat commented on code in PR #22179:
URL: https://github.com/apache/pulsar/pull/22179#discussion_r1518195267


##########
pulsar-broker/src/test/java/org/apache/pulsar/client/metrics/ClientMetricsTest.java:
##########
@@ -0,0 +1,333 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.metrics;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+import io.opentelemetry.api.OpenTelemetry;
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.sdk.OpenTelemetrySdk;
+import io.opentelemetry.sdk.metrics.SdkMeterProvider;
+import io.opentelemetry.sdk.metrics.data.MetricData;
+import io.opentelemetry.sdk.metrics.data.MetricDataType;
+import io.opentelemetry.sdk.testing.exporter.InMemoryMetricReader;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.TimeUnit;
+import lombok.Cleanup;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+@Test(groups = "broker-api")
+public class ClientMetricsTest extends ProducerConsumerBase {
+
+    InMemoryMetricReader reader;
+    OpenTelemetry otel;
+
+    @BeforeMethod
+    @Override
+    protected void setup() throws Exception {
+        super.internalSetup();
+        super.producerBaseSetup();
+
+        this.reader = InMemoryMetricReader.create();
+        SdkMeterProvider sdkMeterProvider = SdkMeterProvider.builder()
+                .registerMetricReader(reader)
+                .build();
+        this.otel = OpenTelemetrySdk.builder().setMeterProvider(sdkMeterProvider).build();
+    }
+
+    @AfterMethod(alwaysRun = true)
+    @Override
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    private Map<String, MetricData> collectMetrics() {
+        Map<String, MetricData> metrics = new TreeMap<>();
+        for (MetricData md : reader.collectAllMetrics()) {
+            metrics.put(md.getName(), md);
+        }
+        return metrics;
+    }
+
+    private void assertCounterValue(Map<String, MetricData> metrics, String name, long expectedValue,
+                                    Attributes expectedAttributes) {
+        assertEquals(getCounterValue(metrics, name, expectedAttributes), expectedValue);
+    }
+
+    private long getCounterValue(Map<String, MetricData> metrics, String name,
+                                    Attributes expectedAttributes) {
+        MetricData md = metrics.get(name);
+        assertNotNull(md, "metric not found: " + name);
+        assertEquals(md.getType(), MetricDataType.LONG_SUM);
+
+        for (var ex : md.getLongSumData().getPoints()) {
+            if (ex.getAttributes().equals(expectedAttributes)) {
+                return ex.getValue();
+            }
+        }
+
+        fail("metric attributes not found: " + expectedAttributes);
+        return -1;
+    }
+
+    private void assertHistoCountValue(Map<String, MetricData> metrics, String name, long expectedCount,
+                                       Attributes expectedAttributes) {
+        assertEquals(getHistoCountValue(metrics, name, expectedAttributes), expectedCount);
+    }
+
+    private long getHistoCountValue(Map<String, MetricData> metrics, String name,
+                                    Attributes expectedAttributes) {
+        MetricData md = metrics.get(name);
+        assertNotNull(md, "metric not found: " + name);
+        assertEquals(md.getType(), MetricDataType.HISTOGRAM);
+
+        for (var ex : md.getHistogramData().getPoints()) {
+            if (ex.getAttributes().equals(expectedAttributes)) {
+                return ex.getCount();
+            }
+        }
+
+        fail("metric attributes not found: " + expectedAttributes);
+        return -1;
+    }
+
+    @Test
+    public void testProducerMetrics() throws Exception {
+        String topic = newTopicName();
+
+        PulsarClient client = PulsarClient.builder()
+                .serviceUrl(pulsar.getBrokerServiceUrl())
+                .openTelemetry(otel)
+                .build();
+
+        Producer<String> producer = client.newProducer(Schema.STRING)
+                .topic(topic)
+                .create();
+
+        for (int i = 0; i < 5; i++) {
+            producer.send("Hello");
+        }
+
+        Attributes nsAttrs = Attributes.builder()
+                .put("pulsar.tenant", "my-property")
+                .put("pulsar.namespace", "my-property/my-ns")
+                .build();
+        Attributes nsAttrsSuccess = nsAttrs.toBuilder()
+                .put("success", true)
+                .build();
+
+        var metrics = collectMetrics();
+        System.err.println("All metrics: " + metrics.keySet());
+
+        assertCounterValue(metrics, "pulsar.client.connections.opened", 1, Attributes.empty());
+        assertCounterValue(metrics, "pulsar.client.producer.message.pending.count", 0, nsAttrs);
+        assertCounterValue(metrics, "pulsar.client.producer.message.pending.size", 0, nsAttrs);
+
+        assertHistoCountValue(metrics, "pulsar.client.lookup.duration", 1,
+                Attributes.builder()
+                        .put("pulsar.lookup.transport-type", "binary")
+                        .put("pulsar.lookup.type", "topic")
+                        .put("success", true)
+                        .build());
+        assertHistoCountValue(metrics, "pulsar.client.lookup.duration", 1,
+                Attributes.builder()
+                        .put("pulsar.lookup.transport-type", "binary")
+                        .put("pulsar.lookup.type", "metadata")
+                        .put("success", true)
+                        .build());
+
+        assertHistoCountValue(metrics, "pulsar.client.producer.message.send.duration", 5, nsAttrsSuccess);
+        assertHistoCountValue(metrics, "pulsar.client.producer.rpc.send.duration", 5, nsAttrsSuccess);
+        assertCounterValue(metrics, "pulsar.client.producer.message.send.size", "hello".length() * 5, nsAttrs);
+
+
+        assertCounterValue(metrics, "pulsar.client.producer.opened", 1, nsAttrs);
+
+        producer.close();
+        client.close();
+
+        metrics = collectMetrics();
+        assertCounterValue(metrics, "pulsar.client.producer.closed", 1, nsAttrs);
+        assertCounterValue(metrics, "pulsar.client.connections.closed", 1, Attributes.empty());
+    }
+
+    @Test
+    public void testConnectionsFailedMetrics() throws Exception {
+        String topic = newTopicName();
+
+        @Cleanup
+        PulsarClient client = PulsarClient.builder()
+                .serviceUrl("pulsar://invalid-pulsar-address:1234")
+                .operationTimeout(3, TimeUnit.SECONDS)
+                .openTelemetry(otel)
+                .build();
+
+        try {
+            client.newProducer(Schema.STRING)
+                    .topic(topic)
+                    .create();
+            fail("Should have failed the producer creation");
+        } catch (Exception e) {
+            // Expected
+        }
+
+        var metrics = collectMetrics();
+
+        assertTrue(getCounterValue(metrics, "pulsar.client.connections.failed",
+                Attributes.builder().put("pulsar.failure.type", "tcp-failed").build()) >= 1);
+    }
+
+    @Test
+    public void testPublishFailedMetrics() throws Exception {
+        String topic = newTopicName();
+
+        @Cleanup
+        PulsarClient client = PulsarClient.builder()
+                .serviceUrl(admin.getServiceUrl())
+                .operationTimeout(3, TimeUnit.SECONDS)
+                .openTelemetry(otel)
+                .build();
+
+        @Cleanup
+        Producer<String> producer = client.newProducer(Schema.STRING)
+                .topic(topic)
+                .sendTimeout(3, TimeUnit.SECONDS)
+                .create();
+
+        // Make the client switch to non-existing broker to make publish fail
+        client.updateServiceUrl("pulsar://invalid-address:6650");
+
+
+        try {
+            producer.send("Hello");
+            fail("Should have failed to publish");
+        } catch (Exception e) {
+            // expected
+        }
+
+        var metrics = collectMetrics();
+
+        Attributes nsAttrs = Attributes.builder()
+                .put("pulsar.tenant", "my-property")
+                .put("pulsar.namespace", "my-property/my-ns")
+                .build();
+        Attributes nsAttrsFailure = nsAttrs.toBuilder()
+                .put("success", false)
+                .build();
+
+        assertCounterValue(metrics, "pulsar.client.producer.message.pending.count", 0, nsAttrs);
+        assertCounterValue(metrics, "pulsar.client.producer.message.pending.size", 0, nsAttrs);
+        assertHistoCountValue(metrics, "pulsar.client.producer.message.send.duration", 1, nsAttrsFailure);
+        assertHistoCountValue(metrics, "pulsar.client.producer.rpc.send.duration", 1, nsAttrsFailure);
+    }
+
+    @Test
+    public void testConsumerMetrics() throws Exception {
+        String topic = newTopicName();
+
+        PulsarClient client = PulsarClient.builder()
+                .serviceUrl(pulsar.getBrokerServiceUrl())
+                .openTelemetry(otel)
+                .build();
+
+        @Cleanup
+        Producer<String> producer = client.newProducer(Schema.STRING)
+                .topic(topic)
+                .create();
+
+        Consumer<String> consumer = client.newConsumer(Schema.STRING)
+                .topic(topic)
+                .subscriptionName("my-sub")
+                .ackTimeout(1, TimeUnit.SECONDS)
+                .subscriptionType(SubscriptionType.Shared)
+                .subscribe();
+
+        for (int i = 0; i < 10; i++) {
+            producer.send("Hello");
+        }
+
+        Thread.sleep(1000);
+
+        Attributes nsAttrs = Attributes.builder()
+                .put("pulsar.tenant", "my-property")
+                .put("pulsar.namespace", "my-property/my-ns")
+                .build();
+        var metrics = collectMetrics();
+
+        assertCounterValue(metrics, "pulsar.client.connections.opened", 1, Attributes.empty());
+
+        assertHistoCountValue(metrics, "pulsar.client.lookup.duration", 2,
+                Attributes.builder()
+                        .put("pulsar.lookup.transport-type", "binary")
+                        .put("pulsar.lookup.type", "topic")
+                        .put("success", true)
+                        .build());
+        assertHistoCountValue(metrics, "pulsar.client.lookup.duration", 2,
+                Attributes.builder()
+                        .put("pulsar.lookup.transport-type", "binary")
+                        .put("pulsar.lookup.type", "metadata")
+                        .put("success", true)
+                        .build());
+
+        assertCounterValue(metrics, "pulsar.client.consumer.prefetched.count", 10, nsAttrs);
+        assertCounterValue(metrics, "pulsar.client.consumer.prefetched.size", "hello".length() * 10, nsAttrs);
+        assertCounterValue(metrics, "pulsar.client.consumer.opened", 1, nsAttrs);
+
+        Message<String> msg1 = consumer.receive();
+        consumer.acknowledge(msg1);
+
+        Message<String> msg2 = consumer.receive();
+        consumer.negativeAcknowledge(msg2);
+
+        /* Message<String> msg3 = */ consumer.receive();
+
+        metrics = collectMetrics();
+        assertCounterValue(metrics, "pulsar.client.consumer.prefetched.count", 7, nsAttrs);

Review Comment:
   👍 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] WIP: PIP-342: OTel client metrics support [pulsar]

Posted by "merlimat (via GitHub)" <gi...@apache.org>.
merlimat commented on code in PR #22179:
URL: https://github.com/apache/pulsar/pull/22179#discussion_r1511962295


##########
pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java:
##########
@@ -554,6 +558,10 @@ ClientBuilder authentication(String authPluginClassName, Map<String, String> aut
      */
     ClientBuilder enableBusyWait(boolean enableBusyWait);
 
+    ClientBuilder openTelemetry(io.opentelemetry.api.OpenTelemetry openTelemetry);
+
+    ClientBuilder openTelemetryMetricsCardinality(MetricsCardinality metricsCardinality);

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] WIP: PIP-342: OTel client metrics support [pulsar]

Posted by "merlimat (via GitHub)" <gi...@apache.org>.
merlimat commented on code in PR #22179:
URL: https://github.com/apache/pulsar/pull/22179#discussion_r1511864558


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java:
##########
@@ -386,7 +402,30 @@ protected ConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurat
 
         topicNameWithoutPartition = topicName.getPartitionedTopicName();
 
+        InstrumentProvider ip = client.instrumentProvider();
+        Attributes attrs = ip.getAttributes(topic);
+        consumersOpenedCounter = ip.newCounter("pulsar.client.session.opened", Unit.Sessions,
+                "Counter of sessions opened", attrs.toBuilder().put("type", "consumer").build());
+        consumersClosedCounter = ip.newCounter("pulsar.client.session.closed", Unit.Sessions,
+                "Counter of sessions closed", attrs.toBuilder().put("type", "consumer").build());
+        messagesReceivedCounter = ip.newCounter("pulsar.client.received", Unit.Messages,
+                "Number of messages received", attrs);
+        bytesReceivedCounter = ip.newCounter("pulsar.client.received", Unit.Bytes,

Review Comment:
   Sounds good



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[HOW TO FOLLOW GITHUB] Re: [PR] WIP: PIP-342: OTel client metrics support [pulsar]

Posted by Dave Fisher <wa...@apache.org>.
For anyone who wishes to watch activity from with a Pulsar GitHub repository please consider subscribing to commits@pulsar.apache.org <ma...@pulsar.apache.org> by sending an email to commits-subscribe@pulsar.apache.org <ma...@pulsar.apache.org> and replying to a CONFIRM email.

Regards,
Dave

> On Mar 6, 2024, at 12:50 PM, asafm (via GitHub) <gi...@apache.org> wrote:
> 
> 
> asafm commented on code in PR #22179:
> URL: https://github.com/apache/pulsar/pull/22179#discussion_r1514920721
> 
> 
> ##########
> pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java:
> ##########
> @@ -451,15 +452,18 @@ ClientBuilder authentication(String authPluginClassName, Map<String, String> aut
>     ClientBuilder memoryLimit(long memoryLimit, SizeUnit unit);
> 
>     /**
> -     * Set the interval between each stat info <i>(default: 60 seconds)</i> Stats will be activated with positive
> +     * Set the interval between each stat info <i>(default: disabled)</i> Stats will be activated with positive
> 
> Review Comment:
>   How do we alert the users that those metrics are still subject to changes that may break them?
> 
> 
> 
> -- 
> This is an automated message from the Apache Git Service.
> To respond to the message, please log on to GitHub and use the
> URL above to go to the specific comment.
> 
> To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org
> 
> For queries about this service, please contact Infrastructure at:
> users@infra.apache.org
> 


[HOW TO FOLLOW GITHUB] Re: [PR] WIP: PIP-342: OTel client metrics support [pulsar]

Posted by Dave Fisher <wa...@apache.org>.
For anyone who wishes to watch activity from with a Pulsar GitHub repository please consider subscribing to commits@pulsar.apache.org <ma...@pulsar.apache.org> by sending an email to commits-subscribe@pulsar.apache.org <ma...@pulsar.apache.org> and replying to a CONFIRM email.

Regards,
Dave

> On Mar 6, 2024, at 12:50 PM, asafm (via GitHub) <gi...@apache.org> wrote:
> 
> 
> asafm commented on code in PR #22179:
> URL: https://github.com/apache/pulsar/pull/22179#discussion_r1514920721
> 
> 
> ##########
> pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java:
> ##########
> @@ -451,15 +452,18 @@ ClientBuilder authentication(String authPluginClassName, Map<String, String> aut
>     ClientBuilder memoryLimit(long memoryLimit, SizeUnit unit);
> 
>     /**
> -     * Set the interval between each stat info <i>(default: 60 seconds)</i> Stats will be activated with positive
> +     * Set the interval between each stat info <i>(default: disabled)</i> Stats will be activated with positive
> 
> Review Comment:
>   How do we alert the users that those metrics are still subject to changes that may break them?
> 
> 
> 
> -- 
> This is an automated message from the Apache Git Service.
> To respond to the message, please log on to GitHub and use the
> URL above to go to the specific comment.
> 
> To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org
> 
> For queries about this service, please contact Infrastructure at:
> users@infra.apache.org
> 


Re: [PR] WIP: PIP-342: OTel client metrics support [pulsar]

Posted by "asafm (via GitHub)" <gi...@apache.org>.
asafm commented on code in PR #22179:
URL: https://github.com/apache/pulsar/pull/22179#discussion_r1514920721


##########
pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java:
##########
@@ -451,15 +452,18 @@ ClientBuilder authentication(String authPluginClassName, Map<String, String> aut
     ClientBuilder memoryLimit(long memoryLimit, SizeUnit unit);
 
     /**
-     * Set the interval between each stat info <i>(default: 60 seconds)</i> Stats will be activated with positive
+     * Set the interval between each stat info <i>(default: disabled)</i> Stats will be activated with positive

Review Comment:
   How do we alert the users that those metrics are still subject to changes that may break them?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [improve] WIP: PIP-342: OTel client metrics support [pulsar]

Posted by "merlimat (via GitHub)" <gi...@apache.org>.
merlimat commented on code in PR #22179:
URL: https://github.com/apache/pulsar/pull/22179#discussion_r1516555860


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java:
##########
@@ -60,11 +64,31 @@ public class HttpLookupService implements LookupService {
     private static final String BasePathV1 = "lookup/v2/destination/";
     private static final String BasePathV2 = "lookup/v2/topic/";
 
-    public HttpLookupService(ClientConfigurationData conf, EventLoopGroup eventLoopGroup)
+    private final LatencyHistogram histoGetBroker;
+    private final LatencyHistogram histoGetTopicMetadata;
+    private final LatencyHistogram histoGetSchema;
+    private final LatencyHistogram histoListTopics;
+
+    public HttpLookupService(InstrumentProvider instrumentProvider, ClientConfigurationData conf, EventLoopGroup eventLoopGroup)
             throws PulsarClientException {
         this.httpClient = new HttpClient(conf, eventLoopGroup);
         this.useTls = conf.isUseTls();
         this.listenerName = conf.getListenerName();
+
+        Attributes attrs = Attributes.of(AttributeKey.stringKey("pulsar.lookup.transport-type"), "binary");

Review Comment:
   🤦 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] WIP: PIP-342: OTel client metrics support [pulsar]

Posted by "merlimat (via GitHub)" <gi...@apache.org>.
merlimat commented on code in PR #22179:
URL: https://github.com/apache/pulsar/pull/22179#discussion_r1513299277


##########
pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerStats.java:
##########
@@ -29,9 +30,12 @@
  *
  * <p>All the stats are relative to the last recording period. The interval of the stats refreshes is configured with
  * {@link ClientBuilder#statsInterval(long, java.util.concurrent.TimeUnit)} with a default of 1 minute.
+ * 
+ * @deprecated use {@link ClientBuilder#openTelemetry(OpenTelemetry)} to enable stats

Review Comment:
   Yes, the PR is marked with "doc-required". We will add documentation on how to use this. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] WIP: PIP-342: OTel client metrics support [pulsar]

Posted by "merlimat (via GitHub)" <gi...@apache.org>.
merlimat commented on PR #22179:
URL: https://github.com/apache/pulsar/pull/22179#issuecomment-1981210195

   @asafm I applied most of the suggestion, please give it another pass


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] WIP: PIP-342: OTel client metrics support [pulsar]

Posted by "merlimat (via GitHub)" <gi...@apache.org>.
merlimat commented on code in PR #22179:
URL: https://github.com/apache/pulsar/pull/22179#discussion_r1513086036


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java:
##########
@@ -258,11 +270,17 @@ public ClientCnx(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, in
         this.idleState = new ClientCnxIdleState(this);
         this.clientVersion = "Pulsar-Java-v" + PulsarVersion.getVersion()
                 + (conf.getDescription() == null ? "" : ("-" + conf.getDescription()));
+        this.connectionsOpenedCounter = instrumentProvider.newCounter("pulsar.client.connections.opened", Unit.Connections,

Review Comment:
   What would be against the guidelines?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [improve] WIP: PIP-342: OTel client metrics support [pulsar]

Posted by "asafm (via GitHub)" <gi...@apache.org>.
asafm commented on code in PR #22179:
URL: https://github.com/apache/pulsar/pull/22179#discussion_r1515713887


##########
pulsar-broker/pom.xml:
##########
@@ -149,6 +149,11 @@
       <version>${project.version}</version>
     </dependency>
 
+    <dependency>
+      <groupId>io.opentelemetry</groupId>
+      <artifactId>opentelemetry-sdk-testing</artifactId>

Review Comment:
   `<scope>test</scope>`



##########
pulsar-broker/src/test/java/org/apache/pulsar/client/metrics/ClientMetricsTest.java:
##########
@@ -0,0 +1,333 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.metrics;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+import io.opentelemetry.api.OpenTelemetry;
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.sdk.OpenTelemetrySdk;
+import io.opentelemetry.sdk.metrics.SdkMeterProvider;
+import io.opentelemetry.sdk.metrics.data.MetricData;
+import io.opentelemetry.sdk.metrics.data.MetricDataType;
+import io.opentelemetry.sdk.testing.exporter.InMemoryMetricReader;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.TimeUnit;
+import lombok.Cleanup;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+@Test(groups = "broker-api")
+public class ClientMetricsTest extends ProducerConsumerBase {
+
+    InMemoryMetricReader reader;
+    OpenTelemetry otel;
+
+    @BeforeMethod
+    @Override
+    protected void setup() throws Exception {
+        super.internalSetup();
+        super.producerBaseSetup();
+
+        this.reader = InMemoryMetricReader.create();
+        SdkMeterProvider sdkMeterProvider = SdkMeterProvider.builder()
+                .registerMetricReader(reader)
+                .build();
+        this.otel = OpenTelemetrySdk.builder().setMeterProvider(sdkMeterProvider).build();
+    }
+
+    @AfterMethod(alwaysRun = true)
+    @Override
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    private Map<String, MetricData> collectMetrics() {
+        Map<String, MetricData> metrics = new TreeMap<>();
+        for (MetricData md : reader.collectAllMetrics()) {
+            metrics.put(md.getName(), md);
+        }
+        return metrics;
+    }
+
+    private void assertCounterValue(Map<String, MetricData> metrics, String name, long expectedValue,

Review Comment:
   You can use the AssertJ extension they've provided in OTel, like @dragosvictor used in the broker side `assertThat(metricReader.collectAllMetrics())` and from there you have quite a nice readable assert commands: https://github.com/apache/pulsar/pull/22058/files#diff-6d0bc9489f6aa68a108c36624bd1deb6b65b61cb15ef5da7887f2b6974072e9a



##########
pulsar-broker/src/test/java/org/apache/pulsar/client/metrics/ClientMetricsTest.java:
##########
@@ -0,0 +1,333 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.metrics;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+import io.opentelemetry.api.OpenTelemetry;
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.sdk.OpenTelemetrySdk;
+import io.opentelemetry.sdk.metrics.SdkMeterProvider;
+import io.opentelemetry.sdk.metrics.data.MetricData;
+import io.opentelemetry.sdk.metrics.data.MetricDataType;
+import io.opentelemetry.sdk.testing.exporter.InMemoryMetricReader;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.TimeUnit;
+import lombok.Cleanup;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+@Test(groups = "broker-api")
+public class ClientMetricsTest extends ProducerConsumerBase {
+
+    InMemoryMetricReader reader;
+    OpenTelemetry otel;
+
+    @BeforeMethod
+    @Override
+    protected void setup() throws Exception {
+        super.internalSetup();
+        super.producerBaseSetup();
+
+        this.reader = InMemoryMetricReader.create();
+        SdkMeterProvider sdkMeterProvider = SdkMeterProvider.builder()
+                .registerMetricReader(reader)
+                .build();
+        this.otel = OpenTelemetrySdk.builder().setMeterProvider(sdkMeterProvider).build();
+    }
+
+    @AfterMethod(alwaysRun = true)
+    @Override
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    private Map<String, MetricData> collectMetrics() {
+        Map<String, MetricData> metrics = new TreeMap<>();
+        for (MetricData md : reader.collectAllMetrics()) {
+            metrics.put(md.getName(), md);
+        }
+        return metrics;
+    }
+
+    private void assertCounterValue(Map<String, MetricData> metrics, String name, long expectedValue,
+                                    Attributes expectedAttributes) {
+        assertEquals(getCounterValue(metrics, name, expectedAttributes), expectedValue);
+    }
+
+    private long getCounterValue(Map<String, MetricData> metrics, String name,
+                                    Attributes expectedAttributes) {
+        MetricData md = metrics.get(name);
+        assertNotNull(md, "metric not found: " + name);
+        assertEquals(md.getType(), MetricDataType.LONG_SUM);
+
+        for (var ex : md.getLongSumData().getPoints()) {
+            if (ex.getAttributes().equals(expectedAttributes)) {
+                return ex.getValue();
+            }
+        }
+
+        fail("metric attributes not found: " + expectedAttributes);
+        return -1;
+    }
+
+    private void assertHistoCountValue(Map<String, MetricData> metrics, String name, long expectedCount,
+                                       Attributes expectedAttributes) {
+        assertEquals(getHistoCountValue(metrics, name, expectedAttributes), expectedCount);
+    }
+
+    private long getHistoCountValue(Map<String, MetricData> metrics, String name,
+                                    Attributes expectedAttributes) {
+        MetricData md = metrics.get(name);
+        assertNotNull(md, "metric not found: " + name);
+        assertEquals(md.getType(), MetricDataType.HISTOGRAM);
+
+        for (var ex : md.getHistogramData().getPoints()) {
+            if (ex.getAttributes().equals(expectedAttributes)) {
+                return ex.getCount();
+            }
+        }
+
+        fail("metric attributes not found: " + expectedAttributes);
+        return -1;
+    }
+
+    @Test
+    public void testProducerMetrics() throws Exception {
+        String topic = newTopicName();
+
+        PulsarClient client = PulsarClient.builder()
+                .serviceUrl(pulsar.getBrokerServiceUrl())
+                .openTelemetry(otel)
+                .build();
+
+        Producer<String> producer = client.newProducer(Schema.STRING)
+                .topic(topic)
+                .create();
+
+        for (int i = 0; i < 5; i++) {
+            producer.send("Hello");
+        }
+
+        Attributes nsAttrs = Attributes.builder()
+                .put("pulsar.tenant", "my-property")
+                .put("pulsar.namespace", "my-property/my-ns")
+                .build();
+        Attributes nsAttrsSuccess = nsAttrs.toBuilder()
+                .put("success", true)
+                .build();
+
+        var metrics = collectMetrics();
+        System.err.println("All metrics: " + metrics.keySet());
+
+        assertCounterValue(metrics, "pulsar.client.connections.opened", 1, Attributes.empty());
+        assertCounterValue(metrics, "pulsar.client.producer.message.pending.count", 0, nsAttrs);
+        assertCounterValue(metrics, "pulsar.client.producer.message.pending.size", 0, nsAttrs);
+
+        assertHistoCountValue(metrics, "pulsar.client.lookup.duration", 1,
+                Attributes.builder()
+                        .put("pulsar.lookup.transport-type", "binary")
+                        .put("pulsar.lookup.type", "topic")
+                        .put("success", true)

Review Comment:
   This attribute should also be prefixed. Perhaps `pulsar.response.status` which can either be `success` or `failed`? I didn't place `lookup` inside since I presume other client commands will need it.



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java:
##########
@@ -386,7 +402,29 @@ protected ConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurat
 
         topicNameWithoutPartition = topicName.getPartitionedTopicName();
 
+        InstrumentProvider ip = client.instrumentProvider();
+        consumersOpenedCounter = ip.newCounter("pulsar.client.consumer.opened", Unit.Sessions,
+                "Counter of sessions opened", topic, Attributes.empty());
+        consumersClosedCounter = ip.newCounter("pulsar.client.consumer.closed", Unit.Sessions,
+                "Counter of sessions closed", topic, Attributes.empty());
+        messagesReceivedCounter = ip.newCounter("pulsar.client.received.count", Unit.Messages,
+                "The number of messages explicitly received by the consumer application", topic, Attributes.empty());
+        bytesReceivedCounter = ip.newCounter("pulsar.client.received.size", Unit.Bytes,
+                "The number of bytes explicitly received by the consumer application", topic, Attributes.empty());
+        messagesPrefetchedGauge = ip.newUpDownCounter("pulsar.client.consumer.prefetched.count", Unit.Messages,
+                "Number of messages currently sitting in the consumer pre-fetch queue", topic, Attributes.empty());
+        bytesPrefetchedGauge = ip.newUpDownCounter("pulsar.client.consumer.prefetched.size", Unit.Bytes,
+                "Total number of bytes currently sitting in the consumer pre-fetch queue", topic, Attributes.empty());

Review Comment:
   "The size of the messages currently sitting in the consumer pre-fetch queue"



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java:
##########
@@ -386,7 +402,29 @@ protected ConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurat
 
         topicNameWithoutPartition = topicName.getPartitionedTopicName();
 
+        InstrumentProvider ip = client.instrumentProvider();
+        consumersOpenedCounter = ip.newCounter("pulsar.client.consumer.opened", Unit.Sessions,
+                "Counter of sessions opened", topic, Attributes.empty());
+        consumersClosedCounter = ip.newCounter("pulsar.client.consumer.closed", Unit.Sessions,
+                "Counter of sessions closed", topic, Attributes.empty());
+        messagesReceivedCounter = ip.newCounter("pulsar.client.received.count", Unit.Messages,

Review Comment:
   Why not `pulsar.client.consumer.message.received.count` and `pulsar.client.consumer.message.received.size`? Aligns with metrics below and with producer metric `pulsar.client.producer.message.send.*`



##########
pulsar-broker/src/test/java/org/apache/pulsar/client/metrics/ClientMetricsTest.java:
##########
@@ -0,0 +1,333 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.metrics;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+import io.opentelemetry.api.OpenTelemetry;
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.sdk.OpenTelemetrySdk;
+import io.opentelemetry.sdk.metrics.SdkMeterProvider;
+import io.opentelemetry.sdk.metrics.data.MetricData;
+import io.opentelemetry.sdk.metrics.data.MetricDataType;
+import io.opentelemetry.sdk.testing.exporter.InMemoryMetricReader;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.TimeUnit;
+import lombok.Cleanup;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+@Test(groups = "broker-api")
+public class ClientMetricsTest extends ProducerConsumerBase {
+
+    InMemoryMetricReader reader;
+    OpenTelemetry otel;
+
+    @BeforeMethod
+    @Override
+    protected void setup() throws Exception {
+        super.internalSetup();
+        super.producerBaseSetup();
+
+        this.reader = InMemoryMetricReader.create();
+        SdkMeterProvider sdkMeterProvider = SdkMeterProvider.builder()
+                .registerMetricReader(reader)
+                .build();
+        this.otel = OpenTelemetrySdk.builder().setMeterProvider(sdkMeterProvider).build();
+    }
+
+    @AfterMethod(alwaysRun = true)
+    @Override
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    private Map<String, MetricData> collectMetrics() {
+        Map<String, MetricData> metrics = new TreeMap<>();
+        for (MetricData md : reader.collectAllMetrics()) {
+            metrics.put(md.getName(), md);
+        }
+        return metrics;
+    }
+
+    private void assertCounterValue(Map<String, MetricData> metrics, String name, long expectedValue,
+                                    Attributes expectedAttributes) {
+        assertEquals(getCounterValue(metrics, name, expectedAttributes), expectedValue);
+    }
+
+    private long getCounterValue(Map<String, MetricData> metrics, String name,
+                                    Attributes expectedAttributes) {
+        MetricData md = metrics.get(name);
+        assertNotNull(md, "metric not found: " + name);
+        assertEquals(md.getType(), MetricDataType.LONG_SUM);
+
+        for (var ex : md.getLongSumData().getPoints()) {
+            if (ex.getAttributes().equals(expectedAttributes)) {
+                return ex.getValue();
+            }
+        }
+
+        fail("metric attributes not found: " + expectedAttributes);
+        return -1;
+    }
+
+    private void assertHistoCountValue(Map<String, MetricData> metrics, String name, long expectedCount,
+                                       Attributes expectedAttributes) {
+        assertEquals(getHistoCountValue(metrics, name, expectedAttributes), expectedCount);
+    }
+
+    private long getHistoCountValue(Map<String, MetricData> metrics, String name,
+                                    Attributes expectedAttributes) {
+        MetricData md = metrics.get(name);
+        assertNotNull(md, "metric not found: " + name);
+        assertEquals(md.getType(), MetricDataType.HISTOGRAM);
+
+        for (var ex : md.getHistogramData().getPoints()) {
+            if (ex.getAttributes().equals(expectedAttributes)) {
+                return ex.getCount();
+            }
+        }
+
+        fail("metric attributes not found: " + expectedAttributes);
+        return -1;
+    }
+
+    @Test
+    public void testProducerMetrics() throws Exception {
+        String topic = newTopicName();
+
+        PulsarClient client = PulsarClient.builder()
+                .serviceUrl(pulsar.getBrokerServiceUrl())
+                .openTelemetry(otel)
+                .build();
+
+        Producer<String> producer = client.newProducer(Schema.STRING)
+                .topic(topic)
+                .create();
+
+        for (int i = 0; i < 5; i++) {
+            producer.send("Hello");
+        }
+
+        Attributes nsAttrs = Attributes.builder()
+                .put("pulsar.tenant", "my-property")
+                .put("pulsar.namespace", "my-property/my-ns")
+                .build();
+        Attributes nsAttrsSuccess = nsAttrs.toBuilder()
+                .put("success", true)
+                .build();
+
+        var metrics = collectMetrics();
+        System.err.println("All metrics: " + metrics.keySet());
+
+        assertCounterValue(metrics, "pulsar.client.connections.opened", 1, Attributes.empty());
+        assertCounterValue(metrics, "pulsar.client.producer.message.pending.count", 0, nsAttrs);
+        assertCounterValue(metrics, "pulsar.client.producer.message.pending.size", 0, nsAttrs);
+
+        assertHistoCountValue(metrics, "pulsar.client.lookup.duration", 1,
+                Attributes.builder()
+                        .put("pulsar.lookup.transport-type", "binary")
+                        .put("pulsar.lookup.type", "topic")
+                        .put("success", true)
+                        .build());
+        assertHistoCountValue(metrics, "pulsar.client.lookup.duration", 1,
+                Attributes.builder()
+                        .put("pulsar.lookup.transport-type", "binary")
+                        .put("pulsar.lookup.type", "metadata")
+                        .put("success", true)
+                        .build());
+
+        assertHistoCountValue(metrics, "pulsar.client.producer.message.send.duration", 5, nsAttrsSuccess);
+        assertHistoCountValue(metrics, "pulsar.client.producer.rpc.send.duration", 5, nsAttrsSuccess);
+        assertCounterValue(metrics, "pulsar.client.producer.message.send.size", "hello".length() * 5, nsAttrs);
+
+
+        assertCounterValue(metrics, "pulsar.client.producer.opened", 1, nsAttrs);
+
+        producer.close();
+        client.close();
+
+        metrics = collectMetrics();
+        assertCounterValue(metrics, "pulsar.client.producer.closed", 1, nsAttrs);
+        assertCounterValue(metrics, "pulsar.client.connections.closed", 1, Attributes.empty());
+    }
+
+    @Test
+    public void testConnectionsFailedMetrics() throws Exception {
+        String topic = newTopicName();
+
+        @Cleanup
+        PulsarClient client = PulsarClient.builder()
+                .serviceUrl("pulsar://invalid-pulsar-address:1234")
+                .operationTimeout(3, TimeUnit.SECONDS)
+                .openTelemetry(otel)
+                .build();
+
+        try {
+            client.newProducer(Schema.STRING)
+                    .topic(topic)
+                    .create();
+            fail("Should have failed the producer creation");
+        } catch (Exception e) {
+            // Expected
+        }
+
+        var metrics = collectMetrics();
+
+        assertTrue(getCounterValue(metrics, "pulsar.client.connections.failed",
+                Attributes.builder().put("pulsar.failure.type", "tcp-failed").build()) >= 1);
+    }
+
+    @Test
+    public void testPublishFailedMetrics() throws Exception {
+        String topic = newTopicName();
+
+        @Cleanup
+        PulsarClient client = PulsarClient.builder()
+                .serviceUrl(admin.getServiceUrl())
+                .operationTimeout(3, TimeUnit.SECONDS)
+                .openTelemetry(otel)
+                .build();
+
+        @Cleanup
+        Producer<String> producer = client.newProducer(Schema.STRING)
+                .topic(topic)
+                .sendTimeout(3, TimeUnit.SECONDS)
+                .create();
+
+        // Make the client switch to non-existing broker to make publish fail
+        client.updateServiceUrl("pulsar://invalid-address:6650");
+
+
+        try {
+            producer.send("Hello");
+            fail("Should have failed to publish");
+        } catch (Exception e) {
+            // expected
+        }
+
+        var metrics = collectMetrics();
+
+        Attributes nsAttrs = Attributes.builder()
+                .put("pulsar.tenant", "my-property")
+                .put("pulsar.namespace", "my-property/my-ns")
+                .build();
+        Attributes nsAttrsFailure = nsAttrs.toBuilder()
+                .put("success", false)
+                .build();
+
+        assertCounterValue(metrics, "pulsar.client.producer.message.pending.count", 0, nsAttrs);
+        assertCounterValue(metrics, "pulsar.client.producer.message.pending.size", 0, nsAttrs);
+        assertHistoCountValue(metrics, "pulsar.client.producer.message.send.duration", 1, nsAttrsFailure);
+        assertHistoCountValue(metrics, "pulsar.client.producer.rpc.send.duration", 1, nsAttrsFailure);
+    }
+
+    @Test
+    public void testConsumerMetrics() throws Exception {
+        String topic = newTopicName();
+
+        PulsarClient client = PulsarClient.builder()
+                .serviceUrl(pulsar.getBrokerServiceUrl())
+                .openTelemetry(otel)
+                .build();
+
+        @Cleanup
+        Producer<String> producer = client.newProducer(Schema.STRING)
+                .topic(topic)
+                .create();
+
+        Consumer<String> consumer = client.newConsumer(Schema.STRING)
+                .topic(topic)
+                .subscriptionName("my-sub")
+                .ackTimeout(1, TimeUnit.SECONDS)
+                .subscriptionType(SubscriptionType.Shared)
+                .subscribe();
+
+        for (int i = 0; i < 10; i++) {
+            producer.send("Hello");
+        }
+
+        Thread.sleep(1000);
+
+        Attributes nsAttrs = Attributes.builder()
+                .put("pulsar.tenant", "my-property")
+                .put("pulsar.namespace", "my-property/my-ns")
+                .build();
+        var metrics = collectMetrics();
+
+        assertCounterValue(metrics, "pulsar.client.connections.opened", 1, Attributes.empty());
+
+        assertHistoCountValue(metrics, "pulsar.client.lookup.duration", 2,
+                Attributes.builder()
+                        .put("pulsar.lookup.transport-type", "binary")
+                        .put("pulsar.lookup.type", "topic")
+                        .put("success", true)
+                        .build());
+        assertHistoCountValue(metrics, "pulsar.client.lookup.duration", 2,
+                Attributes.builder()
+                        .put("pulsar.lookup.transport-type", "binary")
+                        .put("pulsar.lookup.type", "metadata")
+                        .put("success", true)
+                        .build());
+
+        assertCounterValue(metrics, "pulsar.client.consumer.prefetched.count", 10, nsAttrs);
+        assertCounterValue(metrics, "pulsar.client.consumer.prefetched.size", "hello".length() * 10, nsAttrs);
+        assertCounterValue(metrics, "pulsar.client.consumer.opened", 1, nsAttrs);
+
+        Message<String> msg1 = consumer.receive();
+        consumer.acknowledge(msg1);
+
+        Message<String> msg2 = consumer.receive();
+        consumer.negativeAcknowledge(msg2);
+
+        /* Message<String> msg3 = */ consumer.receive();
+
+        metrics = collectMetrics();
+        assertCounterValue(metrics, "pulsar.client.consumer.prefetched.count", 7, nsAttrs);
+        assertCounterValue(metrics, "pulsar.client.consumer.prefetched.size", "hello".length() * 7, nsAttrs);
+
+        // Let msg3 to reach ack-timeout
+        Thread.sleep(3000);
+
+        metrics = collectMetrics();
+        assertCounterValue(metrics, "pulsar.client.consumer.prefetched.count", 8, nsAttrs);
+        assertCounterValue(metrics, "pulsar.client.consumer.prefetched.size", "hello".length() * 8, nsAttrs);
+
+        assertCounterValue(metrics, "pulsar.client.consumer.message.ack", 1, nsAttrs);

Review Comment:
   They only thing missing here is `receive`, which the number of message I received explicitly by calling `receive()` or `receiveBatch()`. It will help me figure out that I receive 100 by didn't ack or nack them.



##########
pulsar-broker/src/test/java/org/apache/pulsar/client/metrics/ClientMetricsTest.java:
##########
@@ -0,0 +1,333 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.metrics;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+import io.opentelemetry.api.OpenTelemetry;
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.sdk.OpenTelemetrySdk;
+import io.opentelemetry.sdk.metrics.SdkMeterProvider;
+import io.opentelemetry.sdk.metrics.data.MetricData;
+import io.opentelemetry.sdk.metrics.data.MetricDataType;
+import io.opentelemetry.sdk.testing.exporter.InMemoryMetricReader;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.TimeUnit;
+import lombok.Cleanup;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+@Test(groups = "broker-api")
+public class ClientMetricsTest extends ProducerConsumerBase {
+
+    InMemoryMetricReader reader;
+    OpenTelemetry otel;
+
+    @BeforeMethod
+    @Override
+    protected void setup() throws Exception {
+        super.internalSetup();
+        super.producerBaseSetup();
+
+        this.reader = InMemoryMetricReader.create();
+        SdkMeterProvider sdkMeterProvider = SdkMeterProvider.builder()
+                .registerMetricReader(reader)
+                .build();
+        this.otel = OpenTelemetrySdk.builder().setMeterProvider(sdkMeterProvider).build();
+    }
+
+    @AfterMethod(alwaysRun = true)
+    @Override
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    private Map<String, MetricData> collectMetrics() {
+        Map<String, MetricData> metrics = new TreeMap<>();
+        for (MetricData md : reader.collectAllMetrics()) {
+            metrics.put(md.getName(), md);
+        }
+        return metrics;
+    }
+
+    private void assertCounterValue(Map<String, MetricData> metrics, String name, long expectedValue,
+                                    Attributes expectedAttributes) {
+        assertEquals(getCounterValue(metrics, name, expectedAttributes), expectedValue);
+    }
+
+    private long getCounterValue(Map<String, MetricData> metrics, String name,
+                                    Attributes expectedAttributes) {
+        MetricData md = metrics.get(name);
+        assertNotNull(md, "metric not found: " + name);
+        assertEquals(md.getType(), MetricDataType.LONG_SUM);
+
+        for (var ex : md.getLongSumData().getPoints()) {
+            if (ex.getAttributes().equals(expectedAttributes)) {
+                return ex.getValue();
+            }
+        }
+
+        fail("metric attributes not found: " + expectedAttributes);
+        return -1;
+    }
+
+    private void assertHistoCountValue(Map<String, MetricData> metrics, String name, long expectedCount,
+                                       Attributes expectedAttributes) {
+        assertEquals(getHistoCountValue(metrics, name, expectedAttributes), expectedCount);
+    }
+
+    private long getHistoCountValue(Map<String, MetricData> metrics, String name,
+                                    Attributes expectedAttributes) {
+        MetricData md = metrics.get(name);
+        assertNotNull(md, "metric not found: " + name);
+        assertEquals(md.getType(), MetricDataType.HISTOGRAM);
+
+        for (var ex : md.getHistogramData().getPoints()) {
+            if (ex.getAttributes().equals(expectedAttributes)) {
+                return ex.getCount();
+            }
+        }
+
+        fail("metric attributes not found: " + expectedAttributes);
+        return -1;
+    }
+
+    @Test
+    public void testProducerMetrics() throws Exception {
+        String topic = newTopicName();
+
+        PulsarClient client = PulsarClient.builder()
+                .serviceUrl(pulsar.getBrokerServiceUrl())
+                .openTelemetry(otel)
+                .build();
+
+        Producer<String> producer = client.newProducer(Schema.STRING)
+                .topic(topic)
+                .create();
+
+        for (int i = 0; i < 5; i++) {
+            producer.send("Hello");
+        }
+
+        Attributes nsAttrs = Attributes.builder()
+                .put("pulsar.tenant", "my-property")
+                .put("pulsar.namespace", "my-property/my-ns")

Review Comment:
   Shouldn't the namespace be just `my-ns`?



##########
pulsar-broker/src/test/java/org/apache/pulsar/client/metrics/ClientMetricsTest.java:
##########
@@ -0,0 +1,333 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.metrics;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+import io.opentelemetry.api.OpenTelemetry;
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.sdk.OpenTelemetrySdk;
+import io.opentelemetry.sdk.metrics.SdkMeterProvider;
+import io.opentelemetry.sdk.metrics.data.MetricData;
+import io.opentelemetry.sdk.metrics.data.MetricDataType;
+import io.opentelemetry.sdk.testing.exporter.InMemoryMetricReader;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.TimeUnit;
+import lombok.Cleanup;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+@Test(groups = "broker-api")
+public class ClientMetricsTest extends ProducerConsumerBase {
+
+    InMemoryMetricReader reader;
+    OpenTelemetry otel;
+
+    @BeforeMethod
+    @Override
+    protected void setup() throws Exception {
+        super.internalSetup();
+        super.producerBaseSetup();
+
+        this.reader = InMemoryMetricReader.create();
+        SdkMeterProvider sdkMeterProvider = SdkMeterProvider.builder()
+                .registerMetricReader(reader)
+                .build();
+        this.otel = OpenTelemetrySdk.builder().setMeterProvider(sdkMeterProvider).build();
+    }
+
+    @AfterMethod(alwaysRun = true)
+    @Override
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    private Map<String, MetricData> collectMetrics() {
+        Map<String, MetricData> metrics = new TreeMap<>();
+        for (MetricData md : reader.collectAllMetrics()) {
+            metrics.put(md.getName(), md);
+        }
+        return metrics;
+    }
+
+    private void assertCounterValue(Map<String, MetricData> metrics, String name, long expectedValue,
+                                    Attributes expectedAttributes) {
+        assertEquals(getCounterValue(metrics, name, expectedAttributes), expectedValue);
+    }
+
+    private long getCounterValue(Map<String, MetricData> metrics, String name,
+                                    Attributes expectedAttributes) {
+        MetricData md = metrics.get(name);
+        assertNotNull(md, "metric not found: " + name);
+        assertEquals(md.getType(), MetricDataType.LONG_SUM);
+
+        for (var ex : md.getLongSumData().getPoints()) {
+            if (ex.getAttributes().equals(expectedAttributes)) {
+                return ex.getValue();
+            }
+        }
+
+        fail("metric attributes not found: " + expectedAttributes);
+        return -1;
+    }
+
+    private void assertHistoCountValue(Map<String, MetricData> metrics, String name, long expectedCount,
+                                       Attributes expectedAttributes) {
+        assertEquals(getHistoCountValue(metrics, name, expectedAttributes), expectedCount);
+    }
+
+    private long getHistoCountValue(Map<String, MetricData> metrics, String name,
+                                    Attributes expectedAttributes) {
+        MetricData md = metrics.get(name);
+        assertNotNull(md, "metric not found: " + name);
+        assertEquals(md.getType(), MetricDataType.HISTOGRAM);
+
+        for (var ex : md.getHistogramData().getPoints()) {
+            if (ex.getAttributes().equals(expectedAttributes)) {
+                return ex.getCount();
+            }
+        }
+
+        fail("metric attributes not found: " + expectedAttributes);
+        return -1;
+    }
+
+    @Test
+    public void testProducerMetrics() throws Exception {
+        String topic = newTopicName();
+
+        PulsarClient client = PulsarClient.builder()
+                .serviceUrl(pulsar.getBrokerServiceUrl())
+                .openTelemetry(otel)
+                .build();
+
+        Producer<String> producer = client.newProducer(Schema.STRING)
+                .topic(topic)
+                .create();
+
+        for (int i = 0; i < 5; i++) {
+            producer.send("Hello");
+        }
+
+        Attributes nsAttrs = Attributes.builder()
+                .put("pulsar.tenant", "my-property")
+                .put("pulsar.namespace", "my-property/my-ns")
+                .build();
+        Attributes nsAttrsSuccess = nsAttrs.toBuilder()
+                .put("success", true)
+                .build();
+
+        var metrics = collectMetrics();
+        System.err.println("All metrics: " + metrics.keySet());
+
+        assertCounterValue(metrics, "pulsar.client.connections.opened", 1, Attributes.empty());
+        assertCounterValue(metrics, "pulsar.client.producer.message.pending.count", 0, nsAttrs);

Review Comment:
   Technically it's an up-down-counter here.
   



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java:
##########
@@ -386,7 +402,29 @@ protected ConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurat
 
         topicNameWithoutPartition = topicName.getPartitionedTopicName();
 
+        InstrumentProvider ip = client.instrumentProvider();
+        consumersOpenedCounter = ip.newCounter("pulsar.client.consumer.opened", Unit.Sessions,
+                "Counter of sessions opened", topic, Attributes.empty());

Review Comment:
   How about "The number of consumer sessions opened"?
   



##########
pulsar-broker/src/test/java/org/apache/pulsar/client/metrics/ClientMetricsTest.java:
##########
@@ -0,0 +1,333 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.metrics;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+import io.opentelemetry.api.OpenTelemetry;
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.sdk.OpenTelemetrySdk;
+import io.opentelemetry.sdk.metrics.SdkMeterProvider;
+import io.opentelemetry.sdk.metrics.data.MetricData;
+import io.opentelemetry.sdk.metrics.data.MetricDataType;
+import io.opentelemetry.sdk.testing.exporter.InMemoryMetricReader;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.TimeUnit;
+import lombok.Cleanup;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+@Test(groups = "broker-api")
+public class ClientMetricsTest extends ProducerConsumerBase {
+
+    InMemoryMetricReader reader;
+    OpenTelemetry otel;
+
+    @BeforeMethod
+    @Override
+    protected void setup() throws Exception {
+        super.internalSetup();
+        super.producerBaseSetup();
+
+        this.reader = InMemoryMetricReader.create();
+        SdkMeterProvider sdkMeterProvider = SdkMeterProvider.builder()
+                .registerMetricReader(reader)
+                .build();
+        this.otel = OpenTelemetrySdk.builder().setMeterProvider(sdkMeterProvider).build();
+    }
+
+    @AfterMethod(alwaysRun = true)
+    @Override
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    private Map<String, MetricData> collectMetrics() {
+        Map<String, MetricData> metrics = new TreeMap<>();
+        for (MetricData md : reader.collectAllMetrics()) {
+            metrics.put(md.getName(), md);
+        }
+        return metrics;
+    }
+
+    private void assertCounterValue(Map<String, MetricData> metrics, String name, long expectedValue,
+                                    Attributes expectedAttributes) {
+        assertEquals(getCounterValue(metrics, name, expectedAttributes), expectedValue);
+    }
+
+    private long getCounterValue(Map<String, MetricData> metrics, String name,
+                                    Attributes expectedAttributes) {
+        MetricData md = metrics.get(name);
+        assertNotNull(md, "metric not found: " + name);
+        assertEquals(md.getType(), MetricDataType.LONG_SUM);
+
+        for (var ex : md.getLongSumData().getPoints()) {
+            if (ex.getAttributes().equals(expectedAttributes)) {
+                return ex.getValue();
+            }
+        }
+
+        fail("metric attributes not found: " + expectedAttributes);
+        return -1;
+    }
+
+    private void assertHistoCountValue(Map<String, MetricData> metrics, String name, long expectedCount,
+                                       Attributes expectedAttributes) {
+        assertEquals(getHistoCountValue(metrics, name, expectedAttributes), expectedCount);
+    }
+
+    private long getHistoCountValue(Map<String, MetricData> metrics, String name,
+                                    Attributes expectedAttributes) {
+        MetricData md = metrics.get(name);
+        assertNotNull(md, "metric not found: " + name);
+        assertEquals(md.getType(), MetricDataType.HISTOGRAM);
+
+        for (var ex : md.getHistogramData().getPoints()) {
+            if (ex.getAttributes().equals(expectedAttributes)) {
+                return ex.getCount();
+            }
+        }
+
+        fail("metric attributes not found: " + expectedAttributes);
+        return -1;
+    }
+
+    @Test
+    public void testProducerMetrics() throws Exception {
+        String topic = newTopicName();
+
+        PulsarClient client = PulsarClient.builder()
+                .serviceUrl(pulsar.getBrokerServiceUrl())
+                .openTelemetry(otel)
+                .build();
+
+        Producer<String> producer = client.newProducer(Schema.STRING)
+                .topic(topic)
+                .create();
+
+        for (int i = 0; i < 5; i++) {
+            producer.send("Hello");
+        }
+
+        Attributes nsAttrs = Attributes.builder()
+                .put("pulsar.tenant", "my-property")
+                .put("pulsar.namespace", "my-property/my-ns")
+                .build();
+        Attributes nsAttrsSuccess = nsAttrs.toBuilder()
+                .put("success", true)
+                .build();
+
+        var metrics = collectMetrics();
+        System.err.println("All metrics: " + metrics.keySet());
+
+        assertCounterValue(metrics, "pulsar.client.connections.opened", 1, Attributes.empty());
+        assertCounterValue(metrics, "pulsar.client.producer.message.pending.count", 0, nsAttrs);
+        assertCounterValue(metrics, "pulsar.client.producer.message.pending.size", 0, nsAttrs);
+
+        assertHistoCountValue(metrics, "pulsar.client.lookup.duration", 1,
+                Attributes.builder()
+                        .put("pulsar.lookup.transport-type", "binary")
+                        .put("pulsar.lookup.type", "topic")
+                        .put("success", true)
+                        .build());
+        assertHistoCountValue(metrics, "pulsar.client.lookup.duration", 1,
+                Attributes.builder()
+                        .put("pulsar.lookup.transport-type", "binary")
+                        .put("pulsar.lookup.type", "metadata")
+                        .put("success", true)
+                        .build());
+
+        assertHistoCountValue(metrics, "pulsar.client.producer.message.send.duration", 5, nsAttrsSuccess);
+        assertHistoCountValue(metrics, "pulsar.client.producer.rpc.send.duration", 5, nsAttrsSuccess);
+        assertCounterValue(metrics, "pulsar.client.producer.message.send.size", "hello".length() * 5, nsAttrs);
+
+
+        assertCounterValue(metrics, "pulsar.client.producer.opened", 1, nsAttrs);
+
+        producer.close();
+        client.close();
+
+        metrics = collectMetrics();
+        assertCounterValue(metrics, "pulsar.client.producer.closed", 1, nsAttrs);
+        assertCounterValue(metrics, "pulsar.client.connections.closed", 1, Attributes.empty());
+    }
+
+    @Test
+    public void testConnectionsFailedMetrics() throws Exception {
+        String topic = newTopicName();
+
+        @Cleanup
+        PulsarClient client = PulsarClient.builder()
+                .serviceUrl("pulsar://invalid-pulsar-address:1234")
+                .operationTimeout(3, TimeUnit.SECONDS)
+                .openTelemetry(otel)
+                .build();
+
+        try {
+            client.newProducer(Schema.STRING)
+                    .topic(topic)
+                    .create();
+            fail("Should have failed the producer creation");
+        } catch (Exception e) {
+            // Expected
+        }
+
+        var metrics = collectMetrics();
+
+        assertTrue(getCounterValue(metrics, "pulsar.client.connections.failed",
+                Attributes.builder().put("pulsar.failure.type", "tcp-failed").build()) >= 1);
+    }
+
+    @Test
+    public void testPublishFailedMetrics() throws Exception {
+        String topic = newTopicName();
+
+        @Cleanup
+        PulsarClient client = PulsarClient.builder()
+                .serviceUrl(admin.getServiceUrl())
+                .operationTimeout(3, TimeUnit.SECONDS)
+                .openTelemetry(otel)
+                .build();
+
+        @Cleanup
+        Producer<String> producer = client.newProducer(Schema.STRING)
+                .topic(topic)
+                .sendTimeout(3, TimeUnit.SECONDS)
+                .create();
+
+        // Make the client switch to non-existing broker to make publish fail
+        client.updateServiceUrl("pulsar://invalid-address:6650");
+
+
+        try {
+            producer.send("Hello");
+            fail("Should have failed to publish");
+        } catch (Exception e) {
+            // expected
+        }
+
+        var metrics = collectMetrics();
+
+        Attributes nsAttrs = Attributes.builder()
+                .put("pulsar.tenant", "my-property")
+                .put("pulsar.namespace", "my-property/my-ns")
+                .build();
+        Attributes nsAttrsFailure = nsAttrs.toBuilder()
+                .put("success", false)
+                .build();
+
+        assertCounterValue(metrics, "pulsar.client.producer.message.pending.count", 0, nsAttrs);
+        assertCounterValue(metrics, "pulsar.client.producer.message.pending.size", 0, nsAttrs);
+        assertHistoCountValue(metrics, "pulsar.client.producer.message.send.duration", 1, nsAttrsFailure);
+        assertHistoCountValue(metrics, "pulsar.client.producer.rpc.send.duration", 1, nsAttrsFailure);
+    }
+
+    @Test
+    public void testConsumerMetrics() throws Exception {
+        String topic = newTopicName();
+
+        PulsarClient client = PulsarClient.builder()
+                .serviceUrl(pulsar.getBrokerServiceUrl())
+                .openTelemetry(otel)
+                .build();
+
+        @Cleanup
+        Producer<String> producer = client.newProducer(Schema.STRING)
+                .topic(topic)
+                .create();
+
+        Consumer<String> consumer = client.newConsumer(Schema.STRING)
+                .topic(topic)
+                .subscriptionName("my-sub")
+                .ackTimeout(1, TimeUnit.SECONDS)
+                .subscriptionType(SubscriptionType.Shared)
+                .subscribe();
+
+        for (int i = 0; i < 10; i++) {
+            producer.send("Hello");
+        }
+
+        Thread.sleep(1000);
+
+        Attributes nsAttrs = Attributes.builder()
+                .put("pulsar.tenant", "my-property")
+                .put("pulsar.namespace", "my-property/my-ns")
+                .build();
+        var metrics = collectMetrics();
+
+        assertCounterValue(metrics, "pulsar.client.connections.opened", 1, Attributes.empty());
+
+        assertHistoCountValue(metrics, "pulsar.client.lookup.duration", 2,
+                Attributes.builder()
+                        .put("pulsar.lookup.transport-type", "binary")
+                        .put("pulsar.lookup.type", "topic")
+                        .put("success", true)
+                        .build());
+        assertHistoCountValue(metrics, "pulsar.client.lookup.duration", 2,
+                Attributes.builder()
+                        .put("pulsar.lookup.transport-type", "binary")
+                        .put("pulsar.lookup.type", "metadata")
+                        .put("success", true)
+                        .build());
+
+        assertCounterValue(metrics, "pulsar.client.consumer.prefetched.count", 10, nsAttrs);
+        assertCounterValue(metrics, "pulsar.client.consumer.prefetched.size", "hello".length() * 10, nsAttrs);
+        assertCounterValue(metrics, "pulsar.client.consumer.opened", 1, nsAttrs);
+
+        Message<String> msg1 = consumer.receive();
+        consumer.acknowledge(msg1);
+
+        Message<String> msg2 = consumer.receive();
+        consumer.negativeAcknowledge(msg2);
+
+        /* Message<String> msg3 = */ consumer.receive();
+
+        metrics = collectMetrics();
+        assertCounterValue(metrics, "pulsar.client.consumer.prefetched.count", 7, nsAttrs);

Review Comment:
   How about `pulsar.client.consumer.receive_queue.count`? 
   



##########
pulsar-broker/src/test/java/org/apache/pulsar/client/metrics/ClientMetricsTest.java:
##########
@@ -0,0 +1,333 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.metrics;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+import io.opentelemetry.api.OpenTelemetry;
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.sdk.OpenTelemetrySdk;
+import io.opentelemetry.sdk.metrics.SdkMeterProvider;
+import io.opentelemetry.sdk.metrics.data.MetricData;
+import io.opentelemetry.sdk.metrics.data.MetricDataType;
+import io.opentelemetry.sdk.testing.exporter.InMemoryMetricReader;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.TimeUnit;
+import lombok.Cleanup;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+@Test(groups = "broker-api")
+public class ClientMetricsTest extends ProducerConsumerBase {
+
+    InMemoryMetricReader reader;
+    OpenTelemetry otel;
+
+    @BeforeMethod
+    @Override
+    protected void setup() throws Exception {
+        super.internalSetup();
+        super.producerBaseSetup();
+
+        this.reader = InMemoryMetricReader.create();
+        SdkMeterProvider sdkMeterProvider = SdkMeterProvider.builder()
+                .registerMetricReader(reader)
+                .build();
+        this.otel = OpenTelemetrySdk.builder().setMeterProvider(sdkMeterProvider).build();
+    }
+
+    @AfterMethod(alwaysRun = true)
+    @Override
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    private Map<String, MetricData> collectMetrics() {
+        Map<String, MetricData> metrics = new TreeMap<>();
+        for (MetricData md : reader.collectAllMetrics()) {
+            metrics.put(md.getName(), md);
+        }
+        return metrics;
+    }
+
+    private void assertCounterValue(Map<String, MetricData> metrics, String name, long expectedValue,
+                                    Attributes expectedAttributes) {
+        assertEquals(getCounterValue(metrics, name, expectedAttributes), expectedValue);
+    }
+
+    private long getCounterValue(Map<String, MetricData> metrics, String name,
+                                    Attributes expectedAttributes) {
+        MetricData md = metrics.get(name);
+        assertNotNull(md, "metric not found: " + name);
+        assertEquals(md.getType(), MetricDataType.LONG_SUM);
+
+        for (var ex : md.getLongSumData().getPoints()) {
+            if (ex.getAttributes().equals(expectedAttributes)) {
+                return ex.getValue();
+            }
+        }
+
+        fail("metric attributes not found: " + expectedAttributes);
+        return -1;
+    }
+
+    private void assertHistoCountValue(Map<String, MetricData> metrics, String name, long expectedCount,
+                                       Attributes expectedAttributes) {
+        assertEquals(getHistoCountValue(metrics, name, expectedAttributes), expectedCount);
+    }
+
+    private long getHistoCountValue(Map<String, MetricData> metrics, String name,
+                                    Attributes expectedAttributes) {
+        MetricData md = metrics.get(name);
+        assertNotNull(md, "metric not found: " + name);
+        assertEquals(md.getType(), MetricDataType.HISTOGRAM);
+
+        for (var ex : md.getHistogramData().getPoints()) {
+            if (ex.getAttributes().equals(expectedAttributes)) {
+                return ex.getCount();
+            }
+        }
+
+        fail("metric attributes not found: " + expectedAttributes);
+        return -1;
+    }
+
+    @Test
+    public void testProducerMetrics() throws Exception {
+        String topic = newTopicName();
+
+        PulsarClient client = PulsarClient.builder()
+                .serviceUrl(pulsar.getBrokerServiceUrl())
+                .openTelemetry(otel)
+                .build();
+
+        Producer<String> producer = client.newProducer(Schema.STRING)
+                .topic(topic)
+                .create();
+
+        for (int i = 0; i < 5; i++) {
+            producer.send("Hello");
+        }
+
+        Attributes nsAttrs = Attributes.builder()
+                .put("pulsar.tenant", "my-property")
+                .put("pulsar.namespace", "my-property/my-ns")
+                .build();
+        Attributes nsAttrsSuccess = nsAttrs.toBuilder()
+                .put("success", true)
+                .build();
+
+        var metrics = collectMetrics();
+        System.err.println("All metrics: " + metrics.keySet());
+
+        assertCounterValue(metrics, "pulsar.client.connections.opened", 1, Attributes.empty());
+        assertCounterValue(metrics, "pulsar.client.producer.message.pending.count", 0, nsAttrs);
+        assertCounterValue(metrics, "pulsar.client.producer.message.pending.size", 0, nsAttrs);
+
+        assertHistoCountValue(metrics, "pulsar.client.lookup.duration", 1,
+                Attributes.builder()
+                        .put("pulsar.lookup.transport-type", "binary")
+                        .put("pulsar.lookup.type", "topic")
+                        .put("success", true)
+                        .build());
+        assertHistoCountValue(metrics, "pulsar.client.lookup.duration", 1,
+                Attributes.builder()
+                        .put("pulsar.lookup.transport-type", "binary")
+                        .put("pulsar.lookup.type", "metadata")
+                        .put("success", true)
+                        .build());
+
+        assertHistoCountValue(metrics, "pulsar.client.producer.message.send.duration", 5, nsAttrsSuccess);
+        assertHistoCountValue(metrics, "pulsar.client.producer.rpc.send.duration", 5, nsAttrsSuccess);
+        assertCounterValue(metrics, "pulsar.client.producer.message.send.size", "hello".length() * 5, nsAttrs);
+
+
+        assertCounterValue(metrics, "pulsar.client.producer.opened", 1, nsAttrs);
+
+        producer.close();
+        client.close();
+
+        metrics = collectMetrics();
+        assertCounterValue(metrics, "pulsar.client.producer.closed", 1, nsAttrs);
+        assertCounterValue(metrics, "pulsar.client.connections.closed", 1, Attributes.empty());
+    }
+
+    @Test
+    public void testConnectionsFailedMetrics() throws Exception {
+        String topic = newTopicName();
+
+        @Cleanup
+        PulsarClient client = PulsarClient.builder()
+                .serviceUrl("pulsar://invalid-pulsar-address:1234")
+                .operationTimeout(3, TimeUnit.SECONDS)
+                .openTelemetry(otel)
+                .build();
+
+        try {
+            client.newProducer(Schema.STRING)
+                    .topic(topic)
+                    .create();
+            fail("Should have failed the producer creation");
+        } catch (Exception e) {
+            // Expected
+        }
+
+        var metrics = collectMetrics();
+
+        assertTrue(getCounterValue(metrics, "pulsar.client.connections.failed",

Review Comment:
   Also possible:
   ```
   assertThat(getCounterValue(metrics, "pulsar.client.connections.failed",
       Attributes.builder().put("pulsar.failure.type", "tcp-failed").build()))
       .isGreaterThanOrEqualTo(1L);
   ```
   If it fails, you get to the value that was smaller, and then target (1).



##########
pulsar-broker/src/test/java/org/apache/pulsar/client/metrics/ClientMetricsTest.java:
##########
@@ -0,0 +1,333 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.metrics;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+import io.opentelemetry.api.OpenTelemetry;
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.sdk.OpenTelemetrySdk;
+import io.opentelemetry.sdk.metrics.SdkMeterProvider;
+import io.opentelemetry.sdk.metrics.data.MetricData;
+import io.opentelemetry.sdk.metrics.data.MetricDataType;
+import io.opentelemetry.sdk.testing.exporter.InMemoryMetricReader;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.TimeUnit;
+import lombok.Cleanup;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+@Test(groups = "broker-api")
+public class ClientMetricsTest extends ProducerConsumerBase {
+
+    InMemoryMetricReader reader;
+    OpenTelemetry otel;
+
+    @BeforeMethod
+    @Override
+    protected void setup() throws Exception {
+        super.internalSetup();
+        super.producerBaseSetup();
+
+        this.reader = InMemoryMetricReader.create();
+        SdkMeterProvider sdkMeterProvider = SdkMeterProvider.builder()
+                .registerMetricReader(reader)
+                .build();
+        this.otel = OpenTelemetrySdk.builder().setMeterProvider(sdkMeterProvider).build();
+    }
+
+    @AfterMethod(alwaysRun = true)
+    @Override
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    private Map<String, MetricData> collectMetrics() {
+        Map<String, MetricData> metrics = new TreeMap<>();
+        for (MetricData md : reader.collectAllMetrics()) {
+            metrics.put(md.getName(), md);
+        }
+        return metrics;
+    }
+
+    private void assertCounterValue(Map<String, MetricData> metrics, String name, long expectedValue,
+                                    Attributes expectedAttributes) {
+        assertEquals(getCounterValue(metrics, name, expectedAttributes), expectedValue);
+    }
+
+    private long getCounterValue(Map<String, MetricData> metrics, String name,
+                                    Attributes expectedAttributes) {
+        MetricData md = metrics.get(name);
+        assertNotNull(md, "metric not found: " + name);
+        assertEquals(md.getType(), MetricDataType.LONG_SUM);
+
+        for (var ex : md.getLongSumData().getPoints()) {
+            if (ex.getAttributes().equals(expectedAttributes)) {
+                return ex.getValue();
+            }
+        }
+
+        fail("metric attributes not found: " + expectedAttributes);
+        return -1;
+    }
+
+    private void assertHistoCountValue(Map<String, MetricData> metrics, String name, long expectedCount,
+                                       Attributes expectedAttributes) {
+        assertEquals(getHistoCountValue(metrics, name, expectedAttributes), expectedCount);
+    }
+
+    private long getHistoCountValue(Map<String, MetricData> metrics, String name,
+                                    Attributes expectedAttributes) {
+        MetricData md = metrics.get(name);
+        assertNotNull(md, "metric not found: " + name);
+        assertEquals(md.getType(), MetricDataType.HISTOGRAM);
+
+        for (var ex : md.getHistogramData().getPoints()) {
+            if (ex.getAttributes().equals(expectedAttributes)) {
+                return ex.getCount();
+            }
+        }
+
+        fail("metric attributes not found: " + expectedAttributes);
+        return -1;
+    }
+
+    @Test
+    public void testProducerMetrics() throws Exception {
+        String topic = newTopicName();
+
+        PulsarClient client = PulsarClient.builder()
+                .serviceUrl(pulsar.getBrokerServiceUrl())
+                .openTelemetry(otel)
+                .build();
+
+        Producer<String> producer = client.newProducer(Schema.STRING)
+                .topic(topic)
+                .create();
+
+        for (int i = 0; i < 5; i++) {
+            producer.send("Hello");
+        }
+
+        Attributes nsAttrs = Attributes.builder()
+                .put("pulsar.tenant", "my-property")
+                .put("pulsar.namespace", "my-property/my-ns")
+                .build();
+        Attributes nsAttrsSuccess = nsAttrs.toBuilder()
+                .put("success", true)
+                .build();
+
+        var metrics = collectMetrics();
+        System.err.println("All metrics: " + metrics.keySet());
+
+        assertCounterValue(metrics, "pulsar.client.connections.opened", 1, Attributes.empty());
+        assertCounterValue(metrics, "pulsar.client.producer.message.pending.count", 0, nsAttrs);
+        assertCounterValue(metrics, "pulsar.client.producer.message.pending.size", 0, nsAttrs);
+
+        assertHistoCountValue(metrics, "pulsar.client.lookup.duration", 1,
+                Attributes.builder()
+                        .put("pulsar.lookup.transport-type", "binary")
+                        .put("pulsar.lookup.type", "topic")
+                        .put("success", true)
+                        .build());
+        assertHistoCountValue(metrics, "pulsar.client.lookup.duration", 1,
+                Attributes.builder()
+                        .put("pulsar.lookup.transport-type", "binary")
+                        .put("pulsar.lookup.type", "metadata")
+                        .put("success", true)
+                        .build());
+
+        assertHistoCountValue(metrics, "pulsar.client.producer.message.send.duration", 5, nsAttrsSuccess);
+        assertHistoCountValue(metrics, "pulsar.client.producer.rpc.send.duration", 5, nsAttrsSuccess);
+        assertCounterValue(metrics, "pulsar.client.producer.message.send.size", "hello".length() * 5, nsAttrs);
+
+
+        assertCounterValue(metrics, "pulsar.client.producer.opened", 1, nsAttrs);
+
+        producer.close();
+        client.close();
+
+        metrics = collectMetrics();
+        assertCounterValue(metrics, "pulsar.client.producer.closed", 1, nsAttrs);
+        assertCounterValue(metrics, "pulsar.client.connections.closed", 1, Attributes.empty());
+    }
+
+    @Test
+    public void testConnectionsFailedMetrics() throws Exception {
+        String topic = newTopicName();
+
+        @Cleanup
+        PulsarClient client = PulsarClient.builder()
+                .serviceUrl("pulsar://invalid-pulsar-address:1234")
+                .operationTimeout(3, TimeUnit.SECONDS)
+                .openTelemetry(otel)
+                .build();
+
+        try {
+            client.newProducer(Schema.STRING)
+                    .topic(topic)
+                    .create();
+            fail("Should have failed the producer creation");
+        } catch (Exception e) {
+            // Expected
+        }
+
+        var metrics = collectMetrics();
+
+        assertTrue(getCounterValue(metrics, "pulsar.client.connections.failed",
+                Attributes.builder().put("pulsar.failure.type", "tcp-failed").build()) >= 1);
+    }
+
+    @Test
+    public void testPublishFailedMetrics() throws Exception {
+        String topic = newTopicName();
+
+        @Cleanup
+        PulsarClient client = PulsarClient.builder()
+                .serviceUrl(admin.getServiceUrl())
+                .operationTimeout(3, TimeUnit.SECONDS)
+                .openTelemetry(otel)
+                .build();
+
+        @Cleanup
+        Producer<String> producer = client.newProducer(Schema.STRING)
+                .topic(topic)
+                .sendTimeout(3, TimeUnit.SECONDS)
+                .create();
+
+        // Make the client switch to non-existing broker to make publish fail
+        client.updateServiceUrl("pulsar://invalid-address:6650");
+
+
+        try {
+            producer.send("Hello");
+            fail("Should have failed to publish");
+        } catch (Exception e) {
+            // expected
+        }
+
+        var metrics = collectMetrics();
+
+        Attributes nsAttrs = Attributes.builder()
+                .put("pulsar.tenant", "my-property")
+                .put("pulsar.namespace", "my-property/my-ns")
+                .build();
+        Attributes nsAttrsFailure = nsAttrs.toBuilder()
+                .put("success", false)
+                .build();
+
+        assertCounterValue(metrics, "pulsar.client.producer.message.pending.count", 0, nsAttrs);
+        assertCounterValue(metrics, "pulsar.client.producer.message.pending.size", 0, nsAttrs);
+        assertHistoCountValue(metrics, "pulsar.client.producer.message.send.duration", 1, nsAttrsFailure);
+        assertHistoCountValue(metrics, "pulsar.client.producer.rpc.send.duration", 1, nsAttrsFailure);
+    }
+
+    @Test
+    public void testConsumerMetrics() throws Exception {
+        String topic = newTopicName();
+
+        PulsarClient client = PulsarClient.builder()
+                .serviceUrl(pulsar.getBrokerServiceUrl())
+                .openTelemetry(otel)
+                .build();
+
+        @Cleanup
+        Producer<String> producer = client.newProducer(Schema.STRING)
+                .topic(topic)
+                .create();
+
+        Consumer<String> consumer = client.newConsumer(Schema.STRING)
+                .topic(topic)
+                .subscriptionName("my-sub")
+                .ackTimeout(1, TimeUnit.SECONDS)
+                .subscriptionType(SubscriptionType.Shared)
+                .subscribe();
+
+        for (int i = 0; i < 10; i++) {
+            producer.send("Hello");
+        }
+
+        Thread.sleep(1000);
+
+        Attributes nsAttrs = Attributes.builder()
+                .put("pulsar.tenant", "my-property")
+                .put("pulsar.namespace", "my-property/my-ns")
+                .build();
+        var metrics = collectMetrics();
+
+        assertCounterValue(metrics, "pulsar.client.connections.opened", 1, Attributes.empty());
+
+        assertHistoCountValue(metrics, "pulsar.client.lookup.duration", 2,
+                Attributes.builder()
+                        .put("pulsar.lookup.transport-type", "binary")
+                        .put("pulsar.lookup.type", "topic")
+                        .put("success", true)
+                        .build());
+        assertHistoCountValue(metrics, "pulsar.client.lookup.duration", 2,
+                Attributes.builder()
+                        .put("pulsar.lookup.transport-type", "binary")
+                        .put("pulsar.lookup.type", "metadata")
+                        .put("success", true)
+                        .build());
+
+        assertCounterValue(metrics, "pulsar.client.consumer.prefetched.count", 10, nsAttrs);
+        assertCounterValue(metrics, "pulsar.client.consumer.prefetched.size", "hello".length() * 10, nsAttrs);
+        assertCounterValue(metrics, "pulsar.client.consumer.opened", 1, nsAttrs);
+
+        Message<String> msg1 = consumer.receive();
+        consumer.acknowledge(msg1);
+
+        Message<String> msg2 = consumer.receive();
+        consumer.negativeAcknowledge(msg2);
+
+        /* Message<String> msg3 = */ consumer.receive();
+
+        metrics = collectMetrics();
+        assertCounterValue(metrics, "pulsar.client.consumer.prefetched.count", 7, nsAttrs);
+        assertCounterValue(metrics, "pulsar.client.consumer.prefetched.size", "hello".length() * 7, nsAttrs);
+
+        // Let msg3 to reach ack-timeout
+        Thread.sleep(3000);

Review Comment:
   Is this immediate or some background threads should kick in marking it as ack-timeout. Asking to verify we won't run into flaky stuff. Can be prevented with awaitility.



##########
pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java:
##########
@@ -554,6 +558,8 @@ ClientBuilder authentication(String authPluginClassName, Map<String, String> aut
      */
     ClientBuilder enableBusyWait(boolean enableBusyWait);
 
+    ClientBuilder openTelemetry(io.opentelemetry.api.OpenTelemetry openTelemetry);

Review Comment:
   Maybe small javadoc hinting to where I should obtain an OpenTelemetry instance? or what is it used for?



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java:
##########
@@ -386,7 +402,29 @@ protected ConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurat
 
         topicNameWithoutPartition = topicName.getPartitionedTopicName();
 
+        InstrumentProvider ip = client.instrumentProvider();
+        consumersOpenedCounter = ip.newCounter("pulsar.client.consumer.opened", Unit.Sessions,
+                "Counter of sessions opened", topic, Attributes.empty());
+        consumersClosedCounter = ip.newCounter("pulsar.client.consumer.closed", Unit.Sessions,
+                "Counter of sessions closed", topic, Attributes.empty());
+        messagesReceivedCounter = ip.newCounter("pulsar.client.received.count", Unit.Messages,

Review Comment:
   Ok, now I see it. The metric I missed before in the test.



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java:
##########
@@ -268,15 +283,35 @@ public ProducerImpl(PulsarClientImpl client, String topic, ProducerConfiguration
             metadata = Collections.unmodifiableMap(new HashMap<>(conf.getProperties()));
         }
 
+        InstrumentProvider ip = client.instrumentProvider();
+        latencyHistogram = ip.newLatencyHistogram("pulsar.client.producer.message.send.duration",

Review Comment:
   Just to be clear: This one measures the latency per single message while the RPC one measure the latency of the Send Command RPC method?



##########
pulsar-broker/src/test/java/org/apache/pulsar/client/metrics/ClientMetricsTest.java:
##########
@@ -0,0 +1,333 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.metrics;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+import io.opentelemetry.api.OpenTelemetry;
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.sdk.OpenTelemetrySdk;
+import io.opentelemetry.sdk.metrics.SdkMeterProvider;
+import io.opentelemetry.sdk.metrics.data.MetricData;
+import io.opentelemetry.sdk.metrics.data.MetricDataType;
+import io.opentelemetry.sdk.testing.exporter.InMemoryMetricReader;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.TimeUnit;
+import lombok.Cleanup;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+@Test(groups = "broker-api")
+public class ClientMetricsTest extends ProducerConsumerBase {
+
+    InMemoryMetricReader reader;
+    OpenTelemetry otel;
+
+    @BeforeMethod
+    @Override
+    protected void setup() throws Exception {
+        super.internalSetup();
+        super.producerBaseSetup();
+
+        this.reader = InMemoryMetricReader.create();
+        SdkMeterProvider sdkMeterProvider = SdkMeterProvider.builder()
+                .registerMetricReader(reader)
+                .build();
+        this.otel = OpenTelemetrySdk.builder().setMeterProvider(sdkMeterProvider).build();
+    }
+
+    @AfterMethod(alwaysRun = true)
+    @Override
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    private Map<String, MetricData> collectMetrics() {
+        Map<String, MetricData> metrics = new TreeMap<>();
+        for (MetricData md : reader.collectAllMetrics()) {
+            metrics.put(md.getName(), md);
+        }
+        return metrics;
+    }
+
+    private void assertCounterValue(Map<String, MetricData> metrics, String name, long expectedValue,
+                                    Attributes expectedAttributes) {
+        assertEquals(getCounterValue(metrics, name, expectedAttributes), expectedValue);
+    }
+
+    private long getCounterValue(Map<String, MetricData> metrics, String name,
+                                    Attributes expectedAttributes) {
+        MetricData md = metrics.get(name);
+        assertNotNull(md, "metric not found: " + name);
+        assertEquals(md.getType(), MetricDataType.LONG_SUM);
+
+        for (var ex : md.getLongSumData().getPoints()) {
+            if (ex.getAttributes().equals(expectedAttributes)) {
+                return ex.getValue();
+            }
+        }
+
+        fail("metric attributes not found: " + expectedAttributes);
+        return -1;
+    }
+
+    private void assertHistoCountValue(Map<String, MetricData> metrics, String name, long expectedCount,
+                                       Attributes expectedAttributes) {
+        assertEquals(getHistoCountValue(metrics, name, expectedAttributes), expectedCount);
+    }
+
+    private long getHistoCountValue(Map<String, MetricData> metrics, String name,
+                                    Attributes expectedAttributes) {
+        MetricData md = metrics.get(name);
+        assertNotNull(md, "metric not found: " + name);
+        assertEquals(md.getType(), MetricDataType.HISTOGRAM);
+
+        for (var ex : md.getHistogramData().getPoints()) {
+            if (ex.getAttributes().equals(expectedAttributes)) {
+                return ex.getCount();
+            }
+        }
+
+        fail("metric attributes not found: " + expectedAttributes);
+        return -1;
+    }
+
+    @Test
+    public void testProducerMetrics() throws Exception {
+        String topic = newTopicName();
+
+        PulsarClient client = PulsarClient.builder()
+                .serviceUrl(pulsar.getBrokerServiceUrl())
+                .openTelemetry(otel)
+                .build();
+
+        Producer<String> producer = client.newProducer(Schema.STRING)
+                .topic(topic)
+                .create();
+
+        for (int i = 0; i < 5; i++) {
+            producer.send("Hello");
+        }
+
+        Attributes nsAttrs = Attributes.builder()
+                .put("pulsar.tenant", "my-property")
+                .put("pulsar.namespace", "my-property/my-ns")
+                .build();
+        Attributes nsAttrsSuccess = nsAttrs.toBuilder()
+                .put("success", true)
+                .build();
+
+        var metrics = collectMetrics();
+        System.err.println("All metrics: " + metrics.keySet());
+
+        assertCounterValue(metrics, "pulsar.client.connections.opened", 1, Attributes.empty());
+        assertCounterValue(metrics, "pulsar.client.producer.message.pending.count", 0, nsAttrs);
+        assertCounterValue(metrics, "pulsar.client.producer.message.pending.size", 0, nsAttrs);
+
+        assertHistoCountValue(metrics, "pulsar.client.lookup.duration", 1,
+                Attributes.builder()
+                        .put("pulsar.lookup.transport-type", "binary")
+                        .put("pulsar.lookup.type", "topic")
+                        .put("success", true)
+                        .build());
+        assertHistoCountValue(metrics, "pulsar.client.lookup.duration", 1,
+                Attributes.builder()
+                        .put("pulsar.lookup.transport-type", "binary")
+                        .put("pulsar.lookup.type", "metadata")
+                        .put("success", true)
+                        .build());
+
+        assertHistoCountValue(metrics, "pulsar.client.producer.message.send.duration", 5, nsAttrsSuccess);
+        assertHistoCountValue(metrics, "pulsar.client.producer.rpc.send.duration", 5, nsAttrsSuccess);
+        assertCounterValue(metrics, "pulsar.client.producer.message.send.size", "hello".length() * 5, nsAttrs);
+
+
+        assertCounterValue(metrics, "pulsar.client.producer.opened", 1, nsAttrs);
+
+        producer.close();
+        client.close();
+
+        metrics = collectMetrics();
+        assertCounterValue(metrics, "pulsar.client.producer.closed", 1, nsAttrs);
+        assertCounterValue(metrics, "pulsar.client.connections.closed", 1, Attributes.empty());
+    }
+
+    @Test
+    public void testConnectionsFailedMetrics() throws Exception {
+        String topic = newTopicName();
+
+        @Cleanup
+        PulsarClient client = PulsarClient.builder()
+                .serviceUrl("pulsar://invalid-pulsar-address:1234")
+                .operationTimeout(3, TimeUnit.SECONDS)
+                .openTelemetry(otel)
+                .build();
+
+        try {

Review Comment:
   If you want, there is also:
   ```
   assertThatThrownBy(() -> {
       client.newProducer(Schema.STRING)
           .topic(topic)
           .create();
   })
   .isInstanceOf(Exception.class);
   ```



##########
pulsar-broker/src/test/java/org/apache/pulsar/client/metrics/ClientMetricsTest.java:
##########
@@ -0,0 +1,333 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.metrics;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+import io.opentelemetry.api.OpenTelemetry;
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.sdk.OpenTelemetrySdk;
+import io.opentelemetry.sdk.metrics.SdkMeterProvider;
+import io.opentelemetry.sdk.metrics.data.MetricData;
+import io.opentelemetry.sdk.metrics.data.MetricDataType;
+import io.opentelemetry.sdk.testing.exporter.InMemoryMetricReader;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.TimeUnit;
+import lombok.Cleanup;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+@Test(groups = "broker-api")
+public class ClientMetricsTest extends ProducerConsumerBase {
+
+    InMemoryMetricReader reader;
+    OpenTelemetry otel;
+
+    @BeforeMethod
+    @Override
+    protected void setup() throws Exception {
+        super.internalSetup();
+        super.producerBaseSetup();
+
+        this.reader = InMemoryMetricReader.create();
+        SdkMeterProvider sdkMeterProvider = SdkMeterProvider.builder()
+                .registerMetricReader(reader)
+                .build();
+        this.otel = OpenTelemetrySdk.builder().setMeterProvider(sdkMeterProvider).build();
+    }
+
+    @AfterMethod(alwaysRun = true)
+    @Override
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    private Map<String, MetricData> collectMetrics() {
+        Map<String, MetricData> metrics = new TreeMap<>();
+        for (MetricData md : reader.collectAllMetrics()) {
+            metrics.put(md.getName(), md);
+        }
+        return metrics;
+    }
+
+    private void assertCounterValue(Map<String, MetricData> metrics, String name, long expectedValue,
+                                    Attributes expectedAttributes) {
+        assertEquals(getCounterValue(metrics, name, expectedAttributes), expectedValue);
+    }
+
+    private long getCounterValue(Map<String, MetricData> metrics, String name,
+                                    Attributes expectedAttributes) {
+        MetricData md = metrics.get(name);
+        assertNotNull(md, "metric not found: " + name);
+        assertEquals(md.getType(), MetricDataType.LONG_SUM);
+
+        for (var ex : md.getLongSumData().getPoints()) {
+            if (ex.getAttributes().equals(expectedAttributes)) {
+                return ex.getValue();
+            }
+        }
+
+        fail("metric attributes not found: " + expectedAttributes);
+        return -1;
+    }
+
+    private void assertHistoCountValue(Map<String, MetricData> metrics, String name, long expectedCount,
+                                       Attributes expectedAttributes) {
+        assertEquals(getHistoCountValue(metrics, name, expectedAttributes), expectedCount);
+    }
+
+    private long getHistoCountValue(Map<String, MetricData> metrics, String name,
+                                    Attributes expectedAttributes) {
+        MetricData md = metrics.get(name);
+        assertNotNull(md, "metric not found: " + name);
+        assertEquals(md.getType(), MetricDataType.HISTOGRAM);
+
+        for (var ex : md.getHistogramData().getPoints()) {
+            if (ex.getAttributes().equals(expectedAttributes)) {
+                return ex.getCount();
+            }
+        }
+
+        fail("metric attributes not found: " + expectedAttributes);
+        return -1;
+    }
+
+    @Test
+    public void testProducerMetrics() throws Exception {
+        String topic = newTopicName();
+
+        PulsarClient client = PulsarClient.builder()
+                .serviceUrl(pulsar.getBrokerServiceUrl())
+                .openTelemetry(otel)
+                .build();
+
+        Producer<String> producer = client.newProducer(Schema.STRING)
+                .topic(topic)
+                .create();
+
+        for (int i = 0; i < 5; i++) {
+            producer.send("Hello");
+        }
+
+        Attributes nsAttrs = Attributes.builder()
+                .put("pulsar.tenant", "my-property")
+                .put("pulsar.namespace", "my-property/my-ns")
+                .build();
+        Attributes nsAttrsSuccess = nsAttrs.toBuilder()
+                .put("success", true)
+                .build();
+
+        var metrics = collectMetrics();
+        System.err.println("All metrics: " + metrics.keySet());

Review Comment:
   I would remove that.



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java:
##########
@@ -84,6 +92,21 @@ public BinaryProtoLookupService(PulsarClientImpl client,
         this.serviceNameResolver = new PulsarServiceNameResolver();
         this.listenerName = listenerName;
         updateServiceUrl(serviceUrl);
+
+        Attributes attrs = Attributes.of(AttributeKey.stringKey("pulsar.lookup.transport-type"), "binary");
+
+        histoGetBroker = client.instrumentProvider().newLatencyHistogram("pulsar.client.lookup.duration",

Review Comment:
   I have to say that it doesn't look good building the same instrument 4 times - repeating the description, unit, name. It is bound to confuse.
   Let's at least build the instrument once, then perhaps build the wrapper around it which couples the attributes with the instrument. Not a big fan of this at - prefer using attributes directly - but it's ok.
   Also - the built in builder is so much readable - you know each string - if it's a description, unit, ...
   I'm not afraid future developer will forget anything, since they are forced to add their metrics its attrributes to the client docs and there it will immediately be visible they forgot something: unit, description, etc.
   
   
   



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java:
##########
@@ -268,15 +283,35 @@ public ProducerImpl(PulsarClientImpl client, String topic, ProducerConfiguration
             metadata = Collections.unmodifiableMap(new HashMap<>(conf.getProperties()));
         }
 
+        InstrumentProvider ip = client.instrumentProvider();
+        latencyHistogram = ip.newLatencyHistogram("pulsar.client.producer.message.send.duration",
+                "Publish latency experienced by the application, includes client batching time", topic,
+                Attributes.empty());
+        rpcLatencyHistogram = ip.newLatencyHistogram("pulsar.client.producer.rpc.send.duration",
+                "Publish RPC latency experienced internally by the client when sending data to receiving an ack", topic,
+                Attributes.empty());
+        publishedBytesCounter = ip.newCounter("pulsar.client.producer.message.send.size",
+                Unit.Bytes, "The number of bytes published", topic, Attributes.empty());
+        pendingMessagesCounter = ip.newUpDownCounter("pulsar.client.producer.message.pending.count", Unit.Messages,
+                "Pending messages for this producer", topic, Attributes.empty());
+        pendingBytesCounter = ip.newUpDownCounter("pulsar.client.producer.message.pending.size", Unit.Bytes,
+                "Pending bytes for this producer", topic, Attributes.empty());

Review Comment:
   "The size of the messages in the producer internal queue, waiting to sent"



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java:
##########
@@ -356,15 +396,22 @@ public MessageImpl<?> getNextMessage() {
 
             @Override
             public void sendComplete(Exception e) {
+                long latencyNanos = System.nanoTime() - createdAt;
+                pendingMessagesCounter.decrement();
+                pendingBytesCounter.subtract(msgSize);

Review Comment:
   `pendingBytesCounter` --> `pendingBytesUpDownCounter`



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java:
##########
@@ -268,15 +283,35 @@ public ProducerImpl(PulsarClientImpl client, String topic, ProducerConfiguration
             metadata = Collections.unmodifiableMap(new HashMap<>(conf.getProperties()));
         }
 
+        InstrumentProvider ip = client.instrumentProvider();
+        latencyHistogram = ip.newLatencyHistogram("pulsar.client.producer.message.send.duration",
+                "Publish latency experienced by the application, includes client batching time", topic,
+                Attributes.empty());
+        rpcLatencyHistogram = ip.newLatencyHistogram("pulsar.client.producer.rpc.send.duration",
+                "Publish RPC latency experienced internally by the client when sending data to receiving an ack", topic,
+                Attributes.empty());
+        publishedBytesCounter = ip.newCounter("pulsar.client.producer.message.send.size",

Review Comment:
   This is counted after they have been published successfully?



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java:
##########
@@ -268,15 +283,35 @@ public ProducerImpl(PulsarClientImpl client, String topic, ProducerConfiguration
             metadata = Collections.unmodifiableMap(new HashMap<>(conf.getProperties()));
         }
 
+        InstrumentProvider ip = client.instrumentProvider();
+        latencyHistogram = ip.newLatencyHistogram("pulsar.client.producer.message.send.duration",
+                "Publish latency experienced by the application, includes client batching time", topic,
+                Attributes.empty());
+        rpcLatencyHistogram = ip.newLatencyHistogram("pulsar.client.producer.rpc.send.duration",
+                "Publish RPC latency experienced internally by the client when sending data to receiving an ack", topic,
+                Attributes.empty());
+        publishedBytesCounter = ip.newCounter("pulsar.client.producer.message.send.size",
+                Unit.Bytes, "The number of bytes published", topic, Attributes.empty());
+        pendingMessagesCounter = ip.newUpDownCounter("pulsar.client.producer.message.pending.count", Unit.Messages,
+                "Pending messages for this producer", topic, Attributes.empty());

Review Comment:
   "The number of messages in the producer internal send queue, waiting to be sent"



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java:
##########
@@ -60,11 +64,31 @@ public class HttpLookupService implements LookupService {
     private static final String BasePathV1 = "lookup/v2/destination/";
     private static final String BasePathV2 = "lookup/v2/topic/";
 
-    public HttpLookupService(ClientConfigurationData conf, EventLoopGroup eventLoopGroup)
+    private final LatencyHistogram histoGetBroker;
+    private final LatencyHistogram histoGetTopicMetadata;
+    private final LatencyHistogram histoGetSchema;
+    private final LatencyHistogram histoListTopics;
+
+    public HttpLookupService(InstrumentProvider instrumentProvider, ClientConfigurationData conf, EventLoopGroup eventLoopGroup)
             throws PulsarClientException {
         this.httpClient = new HttpClient(conf, eventLoopGroup);
         this.useTls = conf.isUseTls();
         this.listenerName = conf.getListenerName();
+
+        Attributes attrs = Attributes.of(AttributeKey.stringKey("pulsar.lookup.transport-type"), "binary");

Review Comment:
   The class name is `HttpLookupService` yet the transport type here is "binary"



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java:
##########
@@ -114,6 +121,11 @@ public UnAckedMessageTracker(PulsarClientImpl client, ConsumerBase<?> consumerBa
         ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
         this.readLock = readWriteLock.readLock();
         this.writeLock = readWriteLock.writeLock();
+
+        InstrumentProvider ip = client.instrumentProvider();
+        consumerAckTimeoutsCounter = ip.newCounter("pulsar.client.consumer.message.ack.timeout", Unit.Messages,
+                "Number of ack timeouts events", consumerBase.getTopic(), Attributes.empty());

Review Comment:
   The description is in "events" and unit is in messages.
   I suggest: "The number of messages that were not acknowledged in the configured timeout period, hence, were requested by the client to be redelivered"



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [improve] WIP: PIP-342: OTel client metrics support [pulsar]

Posted by "merlimat (via GitHub)" <gi...@apache.org>.
merlimat commented on code in PR #22179:
URL: https://github.com/apache/pulsar/pull/22179#discussion_r1517200280


##########
pulsar-broker/src/test/java/org/apache/pulsar/client/metrics/ClientMetricsTest.java:
##########
@@ -0,0 +1,333 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.metrics;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+import io.opentelemetry.api.OpenTelemetry;
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.sdk.OpenTelemetrySdk;
+import io.opentelemetry.sdk.metrics.SdkMeterProvider;
+import io.opentelemetry.sdk.metrics.data.MetricData;
+import io.opentelemetry.sdk.metrics.data.MetricDataType;
+import io.opentelemetry.sdk.testing.exporter.InMemoryMetricReader;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.TimeUnit;
+import lombok.Cleanup;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+@Test(groups = "broker-api")
+public class ClientMetricsTest extends ProducerConsumerBase {
+
+    InMemoryMetricReader reader;
+    OpenTelemetry otel;
+
+    @BeforeMethod
+    @Override
+    protected void setup() throws Exception {
+        super.internalSetup();
+        super.producerBaseSetup();
+
+        this.reader = InMemoryMetricReader.create();
+        SdkMeterProvider sdkMeterProvider = SdkMeterProvider.builder()
+                .registerMetricReader(reader)
+                .build();
+        this.otel = OpenTelemetrySdk.builder().setMeterProvider(sdkMeterProvider).build();
+    }
+
+    @AfterMethod(alwaysRun = true)
+    @Override
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    private Map<String, MetricData> collectMetrics() {
+        Map<String, MetricData> metrics = new TreeMap<>();
+        for (MetricData md : reader.collectAllMetrics()) {
+            metrics.put(md.getName(), md);
+        }
+        return metrics;
+    }
+
+    private void assertCounterValue(Map<String, MetricData> metrics, String name, long expectedValue,

Review Comment:
   I would veemently disagree on the "readable" aspect of it :D 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [improve] WIP: PIP-342: OTel client metrics support [pulsar]

Posted by "merlimat (via GitHub)" <gi...@apache.org>.
merlimat commented on code in PR #22179:
URL: https://github.com/apache/pulsar/pull/22179#discussion_r1518210597


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java:
##########
@@ -84,6 +92,21 @@ public BinaryProtoLookupService(PulsarClientImpl client,
         this.serviceNameResolver = new PulsarServiceNameResolver();
         this.listenerName = listenerName;
         updateServiceUrl(serviceUrl);
+
+        Attributes attrs = Attributes.of(AttributeKey.stringKey("transport-type"), "binary");
+
+        histoGetBroker = client.instrumentProvider().newLatencyHistogram("pulsar.client.lookup",
+                "Lookup operations",
+                attrs.toBuilder().put("type", "topic").build());

Review Comment:
   These are all kinds of "lookups" done by client. User doesn't necessarily need to differentiate them, though we have that possibility through the different attributes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] WIP: PIP-342: OTel client metrics support [pulsar]

Posted by "merlimat (via GitHub)" <gi...@apache.org>.
merlimat commented on code in PR #22179:
URL: https://github.com/apache/pulsar/pull/22179#discussion_r1513100988


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java:
##########
@@ -84,6 +92,21 @@ public BinaryProtoLookupService(PulsarClientImpl client,
         this.serviceNameResolver = new PulsarServiceNameResolver();
         this.listenerName = listenerName;
         updateServiceUrl(serviceUrl);
+
+        Attributes attrs = Attributes.of(AttributeKey.stringKey("transport-type"), "binary");
+
+        histoGetBroker = client.instrumentProvider().newLatencyHistogram("pulsar.client.lookup",

Review Comment:
   The problem is that the attributes will not be as simple (eg: adding topic labels). There is quite a bit of boiler plate to add. 
   
   The other concern is to make sure we have a consistent way to define the instruments (eg: unit, description, latency buckets breakdown) and that attributes are always passed in, without worrying that some part of the code forgot to pass them.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] WIP: PIP-342: OTel client metrics support [pulsar]

Posted by "merlimat (via GitHub)" <gi...@apache.org>.
merlimat commented on code in PR #22179:
URL: https://github.com/apache/pulsar/pull/22179#discussion_r1513267164


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java:
##########
@@ -84,6 +92,21 @@ public BinaryProtoLookupService(PulsarClientImpl client,
         this.serviceNameResolver = new PulsarServiceNameResolver();
         this.listenerName = listenerName;
         updateServiceUrl(serviceUrl);
+
+        Attributes attrs = Attributes.of(AttributeKey.stringKey("transport-type"), "binary");
+
+        histoGetBroker = client.instrumentProvider().newLatencyHistogram("pulsar.client.lookup",
+                "Lookup operations",
+                attrs.toBuilder().put("type", "topic").build());

Review Comment:
   All these are issued by the "LookupService"



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] WIP: PIP-342: OTel client metrics support [pulsar]

Posted by "merlimat (via GitHub)" <gi...@apache.org>.
merlimat commented on code in PR #22179:
URL: https://github.com/apache/pulsar/pull/22179#discussion_r1513274697


##########
pulsar-broker/src/test/java/org/apache/pulsar/client/api/PulsarMultiListenersWithInternalListenerNameTest.java:
##########
@@ -137,7 +139,7 @@ private void doFindBrokerWithListenerName(boolean useHttp) throws Exception {
         conf.setMaxLookupRedirects(10);
 
         @Cleanup
-        LookupService lookupService = useHttp ? new HttpLookupService(conf, eventExecutors) :
+        LookupService lookupService = useHttp ? new HttpLookupService(new InstrumentProvider(new ClientConfigurationData()), conf, eventExecutors) :

Review Comment:
   This constructor, I was only using for tests, though I'm refactoring it out



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [improve] WIP: PIP-342: OTel client metrics support [pulsar]

Posted by "dao-jun (via GitHub)" <gi...@apache.org>.
dao-jun commented on code in PR #22179:
URL: https://github.com/apache/pulsar/pull/22179#discussion_r1515889914


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java:
##########
@@ -386,7 +402,29 @@ protected ConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurat
 
         topicNameWithoutPartition = topicName.getPartitionedTopicName();
 
+        InstrumentProvider ip = client.instrumentProvider();
+        consumersOpenedCounter = ip.newCounter("pulsar.client.consumer.opened", Unit.Sessions,
+                "Counter of sessions opened", topic, Attributes.empty());

Review Comment:
   Since we provided OTEL metrics, can we provide it more detail?
   I mean, the metrics at the TOPIC level are a bit coarse-grained



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [improve] WIP: PIP-342: OTel client metrics support [pulsar]

Posted by "merlimat (via GitHub)" <gi...@apache.org>.
merlimat commented on code in PR #22179:
URL: https://github.com/apache/pulsar/pull/22179#discussion_r1516559196


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java:
##########
@@ -268,15 +283,35 @@ public ProducerImpl(PulsarClientImpl client, String topic, ProducerConfiguration
             metadata = Collections.unmodifiableMap(new HashMap<>(conf.getProperties()));
         }
 
+        InstrumentProvider ip = client.instrumentProvider();
+        latencyHistogram = ip.newLatencyHistogram("pulsar.client.producer.message.send.duration",
+                "Publish latency experienced by the application, includes client batching time", topic,
+                Attributes.empty());
+        rpcLatencyHistogram = ip.newLatencyHistogram("pulsar.client.producer.rpc.send.duration",
+                "Publish RPC latency experienced internally by the client when sending data to receiving an ack", topic,
+                Attributes.empty());
+        publishedBytesCounter = ip.newCounter("pulsar.client.producer.message.send.size",

Review Comment:
   Yes



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [improve] WIP: PIP-342: OTel client metrics support [pulsar]

Posted by "merlimat (via GitHub)" <gi...@apache.org>.
merlimat commented on code in PR #22179:
URL: https://github.com/apache/pulsar/pull/22179#discussion_r1517199911


##########
pulsar-broker/src/test/java/org/apache/pulsar/client/metrics/ClientMetricsTest.java:
##########
@@ -0,0 +1,333 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.metrics;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+import io.opentelemetry.api.OpenTelemetry;
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.sdk.OpenTelemetrySdk;
+import io.opentelemetry.sdk.metrics.SdkMeterProvider;
+import io.opentelemetry.sdk.metrics.data.MetricData;
+import io.opentelemetry.sdk.metrics.data.MetricDataType;
+import io.opentelemetry.sdk.testing.exporter.InMemoryMetricReader;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.TimeUnit;
+import lombok.Cleanup;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+@Test(groups = "broker-api")
+public class ClientMetricsTest extends ProducerConsumerBase {
+
+    InMemoryMetricReader reader;
+    OpenTelemetry otel;
+
+    @BeforeMethod
+    @Override
+    protected void setup() throws Exception {
+        super.internalSetup();
+        super.producerBaseSetup();
+
+        this.reader = InMemoryMetricReader.create();
+        SdkMeterProvider sdkMeterProvider = SdkMeterProvider.builder()
+                .registerMetricReader(reader)
+                .build();
+        this.otel = OpenTelemetrySdk.builder().setMeterProvider(sdkMeterProvider).build();
+    }
+
+    @AfterMethod(alwaysRun = true)
+    @Override
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    private Map<String, MetricData> collectMetrics() {
+        Map<String, MetricData> metrics = new TreeMap<>();
+        for (MetricData md : reader.collectAllMetrics()) {
+            metrics.put(md.getName(), md);
+        }
+        return metrics;
+    }
+
+    private void assertCounterValue(Map<String, MetricData> metrics, String name, long expectedValue,
+                                    Attributes expectedAttributes) {
+        assertEquals(getCounterValue(metrics, name, expectedAttributes), expectedValue);
+    }
+
+    private long getCounterValue(Map<String, MetricData> metrics, String name,
+                                    Attributes expectedAttributes) {
+        MetricData md = metrics.get(name);
+        assertNotNull(md, "metric not found: " + name);
+        assertEquals(md.getType(), MetricDataType.LONG_SUM);
+
+        for (var ex : md.getLongSumData().getPoints()) {
+            if (ex.getAttributes().equals(expectedAttributes)) {
+                return ex.getValue();
+            }
+        }
+
+        fail("metric attributes not found: " + expectedAttributes);
+        return -1;
+    }
+
+    private void assertHistoCountValue(Map<String, MetricData> metrics, String name, long expectedCount,
+                                       Attributes expectedAttributes) {
+        assertEquals(getHistoCountValue(metrics, name, expectedAttributes), expectedCount);
+    }
+
+    private long getHistoCountValue(Map<String, MetricData> metrics, String name,
+                                    Attributes expectedAttributes) {
+        MetricData md = metrics.get(name);
+        assertNotNull(md, "metric not found: " + name);
+        assertEquals(md.getType(), MetricDataType.HISTOGRAM);
+
+        for (var ex : md.getHistogramData().getPoints()) {
+            if (ex.getAttributes().equals(expectedAttributes)) {
+                return ex.getCount();
+            }
+        }
+
+        fail("metric attributes not found: " + expectedAttributes);
+        return -1;
+    }
+
+    @Test
+    public void testProducerMetrics() throws Exception {
+        String topic = newTopicName();
+
+        PulsarClient client = PulsarClient.builder()
+                .serviceUrl(pulsar.getBrokerServiceUrl())
+                .openTelemetry(otel)
+                .build();
+
+        Producer<String> producer = client.newProducer(Schema.STRING)
+                .topic(topic)
+                .create();
+
+        for (int i = 0; i < 5; i++) {
+            producer.send("Hello");
+        }
+
+        Attributes nsAttrs = Attributes.builder()
+                .put("pulsar.tenant", "my-property")
+                .put("pulsar.namespace", "my-property/my-ns")

Review Comment:
   No, the convention everywhere else in pulsar is that the we always use the qualified name for topics and namespaces. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [improve] WIP: PIP-342: OTel client metrics support [pulsar]

Posted by "codecov-commenter (via GitHub)" <gi...@apache.org>.
codecov-commenter commented on PR #22179:
URL: https://github.com/apache/pulsar/pull/22179#issuecomment-1996290772

   ## [Codecov](https://app.codecov.io/gh/apache/pulsar/pull/22179?dropdown=coverage&src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) Report
   Attention: Patch coverage is `91.08434%` with `37 lines` in your changes are missing coverage. Please review.
   > Project coverage is 73.64%. Comparing base [(`bbc6224`)](https://app.codecov.io/gh/apache/pulsar/commit/bbc62245c5ddba1de4b1e7cee4ab49334bc36277?dropdown=coverage&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) to head [(`980bae3`)](https://app.codecov.io/gh/apache/pulsar/pull/22179?dropdown=coverage&src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache).
   > Report is 48 commits behind head on master.
   
   <details><summary>Additional details and impacted files</summary>
   
   
   [![Impacted file tree graph](https://app.codecov.io/gh/apache/pulsar/pull/22179/graphs/tree.svg?width=650&height=150&src=pr&token=acYqCpsK9J&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)](https://app.codecov.io/gh/apache/pulsar/pull/22179?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
   
   ```diff
   @@             Coverage Diff              @@
   ##             master   #22179      +/-   ##
   ============================================
   + Coverage     73.57%   73.64%   +0.07%     
   - Complexity    32624    32783     +159     
   ============================================
     Files          1877     1885       +8     
     Lines        139502   140057     +555     
     Branches      15299    15352      +53     
   ============================================
   + Hits         102638   103151     +513     
   - Misses        28908    28930      +22     
   - Partials       7956     7976      +20     
   ```
   
   | [Flag](https://app.codecov.io/gh/apache/pulsar/pull/22179/flags?src=pr&el=flags&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | Coverage Δ | |
   |---|---|---|
   | [inttests](https://app.codecov.io/gh/apache/pulsar/pull/22179/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `26.76% <60.96%> (+2.17%)` | :arrow_up: |
   | [systests](https://app.codecov.io/gh/apache/pulsar/pull/22179/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `24.62% <54.93%> (+0.30%)` | :arrow_up: |
   | [unittests](https://app.codecov.io/gh/apache/pulsar/pull/22179/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `72.89% <91.08%> (+0.05%)` | :arrow_up: |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Files](https://app.codecov.io/gh/apache/pulsar/pull/22179?dropdown=coverage&src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | Coverage Δ | |
   |---|---|---|
   | [...va/org/apache/pulsar/client/api/ConsumerStats.java](https://app.codecov.io/gh/apache/pulsar/pull/22179?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cHVsc2FyLWNsaWVudC1hcGkvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3B1bHNhci9jbGllbnQvYXBpL0NvbnN1bWVyU3RhdHMuamF2YQ==) | `0.00% <ø> (ø)` | |
   | [...va/org/apache/pulsar/client/api/ProducerStats.java](https://app.codecov.io/gh/apache/pulsar/pull/22179?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cHVsc2FyLWNsaWVudC1hcGkvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3B1bHNhci9jbGllbnQvYXBpL1Byb2R1Y2VyU3RhdHMuamF2YQ==) | `0.00% <ø> (ø)` | |
   | [.../pulsar/client/impl/BatchMessageContainerImpl.java](https://app.codecov.io/gh/apache/pulsar/pull/22179?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cHVsc2FyLWNsaWVudC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2NsaWVudC9pbXBsL0JhdGNoTWVzc2FnZUNvbnRhaW5lckltcGwuamF2YQ==) | `80.89% <100.00%> (ø)` | |
   | [...e/pulsar/client/impl/BinaryProtoLookupService.java](https://app.codecov.io/gh/apache/pulsar/pull/22179?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cHVsc2FyLWNsaWVudC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2NsaWVudC9pbXBsL0JpbmFyeVByb3RvTG9va3VwU2VydmljZS5qYXZh) | `84.36% <100.00%> (+1.82%)` | :arrow_up: |
   | [...g/apache/pulsar/client/impl/ClientBuilderImpl.java](https://app.codecov.io/gh/apache/pulsar/pull/22179?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cHVsc2FyLWNsaWVudC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2NsaWVudC9pbXBsL0NsaWVudEJ1aWxkZXJJbXBsLmphdmE=) | `86.16% <100.00%> (+0.17%)` | :arrow_up: |
   | [.../java/org/apache/pulsar/client/impl/ClientCnx.java](https://app.codecov.io/gh/apache/pulsar/pull/22179?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cHVsc2FyLWNsaWVudC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2NsaWVudC9pbXBsL0NsaWVudENueC5qYXZh) | `71.98% <100.00%> (+0.20%)` | :arrow_up: |
   | [.../org/apache/pulsar/client/impl/ConnectionPool.java](https://app.codecov.io/gh/apache/pulsar/pull/22179?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cHVsc2FyLWNsaWVudC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2NsaWVudC9pbXBsL0Nvbm5lY3Rpb25Qb29sLmphdmE=) | `76.44% <100.00%> (+1.92%)` | :arrow_up: |
   | [...va/org/apache/pulsar/client/impl/ConsumerImpl.java](https://app.codecov.io/gh/apache/pulsar/pull/22179?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cHVsc2FyLWNsaWVudC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2NsaWVudC9pbXBsL0NvbnN1bWVySW1wbC5qYXZh) | `77.96% <100.00%> (+0.39%)` | :arrow_up: |
   | [...ache/pulsar/client/impl/UnAckedMessageTracker.java](https://app.codecov.io/gh/apache/pulsar/pull/22179?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cHVsc2FyLWNsaWVudC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2NsaWVudC9pbXBsL1VuQWNrZWRNZXNzYWdlVHJhY2tlci5qYXZh) | `92.06% <100.00%> (+0.46%)` | :arrow_up: |
   | [...lsar/client/impl/conf/ClientConfigurationData.java](https://app.codecov.io/gh/apache/pulsar/pull/22179?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cHVsc2FyLWNsaWVudC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2NsaWVudC9pbXBsL2NvbmYvQ2xpZW50Q29uZmlndXJhdGlvbkRhdGEuamF2YQ==) | `96.72% <100.00%> (+0.02%)` | :arrow_up: |
   | ... and [11 more](https://app.codecov.io/gh/apache/pulsar/pull/22179?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | |
   
   ... and [74 files with indirect coverage changes](https://app.codecov.io/gh/apache/pulsar/pull/22179/indirect-changes?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
   
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [improve] WIP: PIP-342: OTel client metrics support [pulsar]

Posted by "asafm (via GitHub)" <gi...@apache.org>.
asafm commented on code in PR #22179:
URL: https://github.com/apache/pulsar/pull/22179#discussion_r1515708519


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java:
##########
@@ -84,6 +92,21 @@ public BinaryProtoLookupService(PulsarClientImpl client,
         this.serviceNameResolver = new PulsarServiceNameResolver();
         this.listenerName = listenerName;
         updateServiceUrl(serviceUrl);
+
+        Attributes attrs = Attributes.of(AttributeKey.stringKey("transport-type"), "binary");
+
+        histoGetBroker = client.instrumentProvider().newLatencyHistogram("pulsar.client.lookup",
+                "Lookup operations",
+                attrs.toBuilder().put("type", "topic").build());

Review Comment:
   Yes, but from the point of view of the user, he doesn't know about `LookupService`. They probably know there is binary protocol, and methods (commands) activated from the client. Only developers would know about `LookupService`, no?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] WIP: PIP-342: OTel client metrics support [pulsar]

Posted by "merlimat (via GitHub)" <gi...@apache.org>.
merlimat commented on code in PR #22179:
URL: https://github.com/apache/pulsar/pull/22179#discussion_r1513081438


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java:
##########
@@ -268,6 +283,23 @@ public ProducerImpl(PulsarClientImpl client, String topic, ProducerConfiguration
             metadata = Collections.unmodifiableMap(new HashMap<>(conf.getProperties()));
         }
 
+        InstrumentProvider ip = client.instrumentProvider();
+        Attributes attrs = ip.getAttributes(topic);
+        latencyHistogram = ip.newLatencyHistogram("pulsar.client.producer.latency",
+                "Publish latency experienced by the application, includes client batching time", attrs);
+        rpcLatencyHistogram = ip.newLatencyHistogram("pulsar.client.producer.rpc.latency",
+                "Publish RPC latency experienced internally by the client when sending data to receiving an ack", attrs);
+        publishedBytesCounter = ip.newCounter("pulsar.client.producer.published",
+                Unit.Bytes, "Bytes published", attrs);
+        pendingMessagesCounter = ip.newUpDownCounter("pulsar.client.producer.pending.messages.count", Unit.Messages,
+                "Pending messages for this producer", attrs);
+        pendingBytesCounter = ip.newUpDownCounter("pulsar.client.producer.pending.count", Unit.Bytes,
+                "Pending bytes for this producer", attrs);
+        producersOpenedCounter = ip.newCounter("pulsar.client.session.opened", Unit.Sessions,

Review Comment:
   Yes, changed them to separate metrics



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] WIP: PIP-342: OTel client metrics support [pulsar]

Posted by "merlimat (via GitHub)" <gi...@apache.org>.
merlimat commented on code in PR #22179:
URL: https://github.com/apache/pulsar/pull/22179#discussion_r1511767392


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java:
##########
@@ -258,11 +270,17 @@ public ClientCnx(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, in
         this.idleState = new ClientCnxIdleState(this);
         this.clientVersion = "Pulsar-Java-v" + PulsarVersion.getVersion()
                 + (conf.getDescription() == null ? "" : ("-" + conf.getDescription()));
+        this.connectionsOpenedCounter = instrumentProvider.newCounter("pulsar.client.connections.opened", Unit.Connections,
+                "Counter of connections opened", Attributes.empty());
+        this.connectionsClosedCounter = instrumentProvider.newCounter("pulsar.client.connections.closed", Unit.Connections,

Review Comment:
   I don't think these are going to be interesting. As in: there was never a moment in which I'd have said that I really needed that information. I'd leave it out, at least for now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] WIP: PIP-342: OTel client metrics support [pulsar]

Posted by "merlimat (via GitHub)" <gi...@apache.org>.
merlimat commented on code in PR #22179:
URL: https://github.com/apache/pulsar/pull/22179#discussion_r1511853751


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java:
##########
@@ -386,7 +402,30 @@ protected ConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurat
 
         topicNameWithoutPartition = topicName.getPartitionedTopicName();
 
+        InstrumentProvider ip = client.instrumentProvider();
+        Attributes attrs = ip.getAttributes(topic);
+        consumersOpenedCounter = ip.newCounter("pulsar.client.session.opened", Unit.Sessions,
+                "Counter of sessions opened", attrs.toBuilder().put("type", "consumer").build());
+        consumersClosedCounter = ip.newCounter("pulsar.client.session.closed", Unit.Sessions,
+                "Counter of sessions closed", attrs.toBuilder().put("type", "consumer").build());
+        messagesReceivedCounter = ip.newCounter("pulsar.client.received", Unit.Messages,
+                "Number of messages received", attrs);
+        bytesReceivedCounter = ip.newCounter("pulsar.client.received", Unit.Bytes,
+                "Bytes received", attrs);
+        messagesPrefetchedGauge = ip.newUpDownCounter("pulsar.client.consumer.preteched.messages", Unit.Messages,

Review Comment:
   Ok, it looks good to standardize on count/size.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] WIP: PIP-342: OTel client metrics support [pulsar]

Posted by "merlimat (via GitHub)" <gi...@apache.org>.
merlimat commented on code in PR #22179:
URL: https://github.com/apache/pulsar/pull/22179#discussion_r1511503961


##########
pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java:
##########
@@ -554,6 +558,10 @@ ClientBuilder authentication(String authPluginClassName, Map<String, String> aut
      */
     ClientBuilder enableBusyWait(boolean enableBusyWait);
 
+    ClientBuilder openTelemetry(io.opentelemetry.api.OpenTelemetry openTelemetry);
+
+    ClientBuilder openTelemetryMetricsCardinality(MetricsCardinality metricsCardinality);

Review Comment:
   I couldn't find an example on how to get the `ExtendedDoubleCounterBuilder` 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [improve] WIP: PIP-342: OTel client metrics support [pulsar]

Posted by "KevinLiLu (via GitHub)" <gi...@apache.org>.
KevinLiLu commented on code in PR #22179:
URL: https://github.com/apache/pulsar/pull/22179#discussion_r1524118347


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/metrics/MetricsUtil.java:
##########
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.client.impl.metrics;
+
+import com.google.common.collect.Lists;
+import io.opentelemetry.api.common.AttributeKey;
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.api.common.AttributesBuilder;
+import java.util.ArrayList;
+import java.util.List;
+import lombok.experimental.UtilityClass;
+import org.apache.pulsar.common.naming.TopicName;
+
+@UtilityClass
+public class MetricsUtil {
+
+    // By default, advice to use namespace level aggregation only
+    private static final List<AttributeKey<String>> DEFAULT_AGGREGATION_LABELS = Lists.newArrayList(
+            AttributeKey.stringKey("pulsar.tenant"),

Review Comment:
   Would it make sense to put the attribute values in a public enum/list so users can directly reference/see all possible values without having to read through the code? This might be helpful if a user wants to customize which attributes are used for aggregation. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [improve] WIP: PIP-342: OTel client metrics support [pulsar]

Posted by "merlimat (via GitHub)" <gi...@apache.org>.
merlimat commented on code in PR #22179:
URL: https://github.com/apache/pulsar/pull/22179#discussion_r1524088873


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java:
##########
@@ -386,7 +402,29 @@ protected ConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurat
 
         topicNameWithoutPartition = topicName.getPartitionedTopicName();
 
+        InstrumentProvider ip = client.instrumentProvider();
+        consumersOpenedCounter = ip.newCounter("pulsar.client.consumer.opened", Unit.Sessions,
+                "Counter of sessions opened", topic, Attributes.empty());

Review Comment:
   Good point. I've added `pulsar.subscription` attribute



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [improve] WIP: PIP-342: OTel client metrics support [pulsar]

Posted by "merlimat (via GitHub)" <gi...@apache.org>.
merlimat commented on code in PR #22179:
URL: https://github.com/apache/pulsar/pull/22179#discussion_r1517202529


##########
pulsar-broker/src/test/java/org/apache/pulsar/client/metrics/ClientMetricsTest.java:
##########
@@ -0,0 +1,333 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.metrics;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+import io.opentelemetry.api.OpenTelemetry;
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.sdk.OpenTelemetrySdk;
+import io.opentelemetry.sdk.metrics.SdkMeterProvider;
+import io.opentelemetry.sdk.metrics.data.MetricData;
+import io.opentelemetry.sdk.metrics.data.MetricDataType;
+import io.opentelemetry.sdk.testing.exporter.InMemoryMetricReader;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.TimeUnit;
+import lombok.Cleanup;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+@Test(groups = "broker-api")
+public class ClientMetricsTest extends ProducerConsumerBase {
+
+    InMemoryMetricReader reader;
+    OpenTelemetry otel;
+
+    @BeforeMethod
+    @Override
+    protected void setup() throws Exception {
+        super.internalSetup();
+        super.producerBaseSetup();
+
+        this.reader = InMemoryMetricReader.create();
+        SdkMeterProvider sdkMeterProvider = SdkMeterProvider.builder()
+                .registerMetricReader(reader)
+                .build();
+        this.otel = OpenTelemetrySdk.builder().setMeterProvider(sdkMeterProvider).build();
+    }
+
+    @AfterMethod(alwaysRun = true)
+    @Override
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    private Map<String, MetricData> collectMetrics() {
+        Map<String, MetricData> metrics = new TreeMap<>();
+        for (MetricData md : reader.collectAllMetrics()) {
+            metrics.put(md.getName(), md);
+        }
+        return metrics;
+    }
+
+    private void assertCounterValue(Map<String, MetricData> metrics, String name, long expectedValue,
+                                    Attributes expectedAttributes) {
+        assertEquals(getCounterValue(metrics, name, expectedAttributes), expectedValue);
+    }
+
+    private long getCounterValue(Map<String, MetricData> metrics, String name,
+                                    Attributes expectedAttributes) {
+        MetricData md = metrics.get(name);
+        assertNotNull(md, "metric not found: " + name);
+        assertEquals(md.getType(), MetricDataType.LONG_SUM);
+
+        for (var ex : md.getLongSumData().getPoints()) {
+            if (ex.getAttributes().equals(expectedAttributes)) {
+                return ex.getValue();
+            }
+        }
+
+        fail("metric attributes not found: " + expectedAttributes);
+        return -1;
+    }
+
+    private void assertHistoCountValue(Map<String, MetricData> metrics, String name, long expectedCount,
+                                       Attributes expectedAttributes) {
+        assertEquals(getHistoCountValue(metrics, name, expectedAttributes), expectedCount);
+    }
+
+    private long getHistoCountValue(Map<String, MetricData> metrics, String name,
+                                    Attributes expectedAttributes) {
+        MetricData md = metrics.get(name);
+        assertNotNull(md, "metric not found: " + name);
+        assertEquals(md.getType(), MetricDataType.HISTOGRAM);
+
+        for (var ex : md.getHistogramData().getPoints()) {
+            if (ex.getAttributes().equals(expectedAttributes)) {
+                return ex.getCount();
+            }
+        }
+
+        fail("metric attributes not found: " + expectedAttributes);
+        return -1;
+    }
+
+    @Test
+    public void testProducerMetrics() throws Exception {
+        String topic = newTopicName();
+
+        PulsarClient client = PulsarClient.builder()
+                .serviceUrl(pulsar.getBrokerServiceUrl())
+                .openTelemetry(otel)
+                .build();
+
+        Producer<String> producer = client.newProducer(Schema.STRING)
+                .topic(topic)
+                .create();
+
+        for (int i = 0; i < 5; i++) {
+            producer.send("Hello");
+        }
+
+        Attributes nsAttrs = Attributes.builder()
+                .put("pulsar.tenant", "my-property")
+                .put("pulsar.namespace", "my-property/my-ns")
+                .build();
+        Attributes nsAttrsSuccess = nsAttrs.toBuilder()
+                .put("success", true)
+                .build();
+
+        var metrics = collectMetrics();
+        System.err.println("All metrics: " + metrics.keySet());
+
+        assertCounterValue(metrics, "pulsar.client.connections.opened", 1, Attributes.empty());
+        assertCounterValue(metrics, "pulsar.client.producer.message.pending.count", 0, nsAttrs);

Review Comment:
   The metric is collected as a `LONG_SUM` type which is the same as a counter, so the same validation works for both.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [improve] WIP: PIP-342: OTel client metrics support [pulsar]

Posted by "merlimat (via GitHub)" <gi...@apache.org>.
merlimat commented on code in PR #22179:
URL: https://github.com/apache/pulsar/pull/22179#discussion_r1525096102


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/metrics/MetricsUtil.java:
##########
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.client.impl.metrics;
+
+import com.google.common.collect.Lists;
+import io.opentelemetry.api.common.AttributeKey;
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.api.common.AttributesBuilder;
+import java.util.ArrayList;
+import java.util.List;
+import lombok.experimental.UtilityClass;
+import org.apache.pulsar.common.naming.TopicName;
+
+@UtilityClass
+public class MetricsUtil {
+
+    // By default, advice to use namespace level aggregation only
+    private static final List<AttributeKey<String>> DEFAULT_AGGREGATION_LABELS = Lists.newArrayList(
+            AttributeKey.stringKey("pulsar.tenant"),

Review Comment:
   The attributes for each metrics are specified in the PIP and will be included in the documentation



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] WIP: PIP-342: OTel client metrics support [pulsar]

Posted by "merlimat (via GitHub)" <gi...@apache.org>.
merlimat commented on code in PR #22179:
URL: https://github.com/apache/pulsar/pull/22179#discussion_r1511859164


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java:
##########
@@ -268,6 +283,23 @@ public ProducerImpl(PulsarClientImpl client, String topic, ProducerConfiguration
             metadata = Collections.unmodifiableMap(new HashMap<>(conf.getProperties()));
         }
 
+        InstrumentProvider ip = client.instrumentProvider();
+        Attributes attrs = ip.getAttributes(topic);
+        latencyHistogram = ip.newLatencyHistogram("pulsar.client.producer.latency",
+                "Publish latency experienced by the application, includes client batching time", attrs);
+        rpcLatencyHistogram = ip.newLatencyHistogram("pulsar.client.producer.rpc.latency",
+                "Publish RPC latency experienced internally by the client when sending data to receiving an ack", attrs);
+        publishedBytesCounter = ip.newCounter("pulsar.client.producer.published",

Review Comment:
   Renaming to `pulsar.client.producer.message.send.size`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] WIP: PIP-342: OTel client metrics support [pulsar]

Posted by "merlimat (via GitHub)" <gi...@apache.org>.
merlimat commented on code in PR #22179:
URL: https://github.com/apache/pulsar/pull/22179#discussion_r1511855297


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java:
##########
@@ -386,7 +402,30 @@ protected ConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurat
 
         topicNameWithoutPartition = topicName.getPartitionedTopicName();
 
+        InstrumentProvider ip = client.instrumentProvider();
+        Attributes attrs = ip.getAttributes(topic);
+        consumersOpenedCounter = ip.newCounter("pulsar.client.session.opened", Unit.Sessions,

Review Comment:
   It would be a producer or consumer session



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] WIP: PIP-342: OTel client metrics support [pulsar]

Posted by "merlimat (via GitHub)" <gi...@apache.org>.
merlimat commented on code in PR #22179:
URL: https://github.com/apache/pulsar/pull/22179#discussion_r1513267595


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java:
##########
@@ -84,6 +92,21 @@ public BinaryProtoLookupService(PulsarClientImpl client,
         this.serviceNameResolver = new PulsarServiceNameResolver();
         this.listenerName = listenerName;
         updateServiceUrl(serviceUrl);
+
+        Attributes attrs = Attributes.of(AttributeKey.stringKey("transport-type"), "binary");
+
+        histoGetBroker = client.instrumentProvider().newLatencyHistogram("pulsar.client.lookup",

Review Comment:
   Sure, it make sense



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] WIP: PIP-342: OTel client metrics support [pulsar]

Posted by "asafm (via GitHub)" <gi...@apache.org>.
asafm commented on code in PR #22179:
URL: https://github.com/apache/pulsar/pull/22179#discussion_r1512329444


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java:
##########
@@ -386,7 +402,30 @@ protected ConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurat
 
         topicNameWithoutPartition = topicName.getPartitionedTopicName();
 
+        InstrumentProvider ip = client.instrumentProvider();
+        Attributes attrs = ip.getAttributes(topic);
+        consumersOpenedCounter = ip.newCounter("pulsar.client.session.opened", Unit.Sessions,

Review Comment:
   Why not count the number of consumers or the number of producers? `pulsar.client.consumer.count` or `pulsar.client.producer.count`?  Why the word session is in play here? What is the difference between a consumer session and a consumer?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [improve] WIP: PIP-342: OTel client metrics support [pulsar]

Posted by "merlimat (via GitHub)" <gi...@apache.org>.
merlimat commented on code in PR #22179:
URL: https://github.com/apache/pulsar/pull/22179#discussion_r1517201683


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java:
##########
@@ -386,7 +402,29 @@ protected ConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurat
 
         topicNameWithoutPartition = topicName.getPartitionedTopicName();
 
+        InstrumentProvider ip = client.instrumentProvider();
+        consumersOpenedCounter = ip.newCounter("pulsar.client.consumer.opened", Unit.Sessions,
+                "Counter of sessions opened", topic, Attributes.empty());

Review Comment:
   I'm not sure I understand the comment. The counter here is getting tagged with tenant/namespace/topic/partition attributes. By default, we only emit `tenant` and `namespace` and user would be able to select the desired level



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [improve] WIP: PIP-342: OTel client metrics support [pulsar]

Posted by "dao-jun (via GitHub)" <gi...@apache.org>.
dao-jun commented on code in PR #22179:
URL: https://github.com/apache/pulsar/pull/22179#discussion_r1515879530


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/metrics/InstrumentProvider.java:
##########
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.client.impl.metrics;
+
+import io.opentelemetry.api.GlobalOpenTelemetry;
+import io.opentelemetry.api.OpenTelemetry;
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.api.metrics.Meter;
+import java.util.Optional;
+import org.apache.pulsar.PulsarVersion;
+import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+
+public class InstrumentProvider {
+
+    public static final InstrumentProvider NOOP = new InstrumentProvider(OpenTelemetry.noop());
+
+    private final Meter meter;
+
+    public InstrumentProvider(OpenTelemetry otel) {
+        if (otel == null) {
+            // By default, metrics are disabled, unless the OTel java agent is configured.
+            // This allows to enable metrics without any code change.

Review Comment:
   I doubt about this, since OTEL agent's lib and pulsar client's lib might be loaded by different classloaders, even through there is a javaagent installed on the app, but GlobalOpenTelemetry.get() may returns noop.
   Maybe I'm wrong, but it might be happens.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [improve] WIP: PIP-342: OTel client metrics support [pulsar]

Posted by "merlimat (via GitHub)" <gi...@apache.org>.
merlimat commented on code in PR #22179:
URL: https://github.com/apache/pulsar/pull/22179#discussion_r1518209219


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java:
##########
@@ -258,11 +270,17 @@ public ClientCnx(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, in
         this.idleState = new ClientCnxIdleState(this);
         this.clientVersion = "Pulsar-Java-v" + PulsarVersion.getVersion()
                 + (conf.getDescription() == null ? "" : ("-" + conf.getDescription()));
+        this.connectionsOpenedCounter = instrumentProvider.newCounter("pulsar.client.connections.opened", Unit.Connections,

Review Comment:
   👍 Fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] WIP: PIP-342: OTel client metrics support [pulsar]

Posted by "asafm (via GitHub)" <gi...@apache.org>.
asafm commented on code in PR #22179:
URL: https://github.com/apache/pulsar/pull/22179#discussion_r1510298288


##########
pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java:
##########
@@ -554,6 +558,10 @@ ClientBuilder authentication(String authPluginClassName, Map<String, String> aut
      */
     ClientBuilder enableBusyWait(boolean enableBusyWait);
 
+    ClientBuilder openTelemetry(io.opentelemetry.api.OpenTelemetry openTelemetry);
+
+    ClientBuilder openTelemetryMetricsCardinality(MetricsCardinality metricsCardinality);

Review Comment:
   As discussed in the community bi-weekly meeting, I prefer to avoid any "add-ons" / wrappers around OpenTelemetry and use it as plainly as possible to keep things simple.
   
   For this point specifically, we can record the metrics using *all* the attributes we want, including the high-cardinality ones such as topic. OpenTelemetry [added](https://github.com/open-telemetry/opentelemetry-specification/pull/3546/files) an experimental metrics advice API to the specifications, allowing an instrument to specify the default attributes for the default view. This means we can record using namespace and topic attributes but only default to namespace.
   
   This was implemented in Java SDK and placed at a different interface and artifact, which can depend on it.
   ```
   /** Extended {@link DoubleCounterBuilder} with experimental APIs. */
   public interface ExtendedDoubleCounterBuilder extends DoubleCounterBuilder {
   
     /**
      * Specify the attribute advice, which suggests the recommended set of attribute keys to be used
      * for this counter.
      */
     default ExtendedDoubleCounterBuilder setAttributesAdvice(List<AttributeKey<?>> attributes) {
       return this;
     }
   }
   ```
   
   The issue that started all of this in December 2022: https://github.com/open-telemetry/opentelemetry-specification/issues/3061.
   
   Given all of this, I would remove this line from the builder.



##########
pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java:
##########
@@ -554,6 +558,10 @@ ClientBuilder authentication(String authPluginClassName, Map<String, String> aut
      */
     ClientBuilder enableBusyWait(boolean enableBusyWait);
 
+    ClientBuilder openTelemetry(io.opentelemetry.api.OpenTelemetry openTelemetry);

Review Comment:
   This supports the use case where the user chose OpenTelemetry to define/export their metrics, *and* they have done so explicitly since they have an `OpenTelemetry` instance they can pass to the Pulsar client.
   There is also the possibility of a user just installing the OTel Java user-agent (JVM command line parameters). When that is the case, the Java Agent is building an `OpenTelemetry` instance and placing it at `GlobalOpenTelemetry`. Hence in the `ClientBuilder.build()` implementation, we can default to `GlobalOpenTelemetry.get()` in case the user has not supplied the `OpenTelemetry` instance, and then the Java agent use case will just work - metrics will be registered and exported.
   
   If the user agent was not installed AND `openTelemetry()` method wasn't used, when we default to `GlobalOpenTelemetry.get()`, we'll simply get a NoOp `OpenTelemetry`.
   
   If the user decided to use `GlobalOpenTelemetry` explicitly, then it's also ok, as we'll just use it, even if they didn't provide `OpenTelemetry` instance explicitly. 
   
   



##########
pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java:
##########
@@ -451,15 +452,18 @@ ClientBuilder authentication(String authPluginClassName, Map<String, String> aut
     ClientBuilder memoryLimit(long memoryLimit, SizeUnit unit);
 
     /**
-     * Set the interval between each stat info <i>(default: 60 seconds)</i> Stats will be activated with positive
+     * Set the interval between each stat info <i>(default: disabled)</i> Stats will be activated with positive

Review Comment:
   On the server side, we have marked OpenTelemetry as Experimental since we might get some feedback and decide to name it differently. For example, we might change something on the server side and then decide to change it on the client side. If we mark client-side OTel metrics as experimental, we can change it.
   If we do so, maybe it's better to leave the 60 seconds value as is for now.
   



##########
pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerStats.java:
##########
@@ -29,9 +30,12 @@
  *
  * <p>All the stats are relative to the last recording period. The interval of the stats refreshes is configured with
  * {@link ClientBuilder#statsInterval(long, java.util.concurrent.TimeUnit)} with a default of 1 minute.
+ * 
+ * @deprecated use {@link ClientBuilder#openTelemetry(OpenTelemetry)} to enable stats

Review Comment:
   I think it's very important to have documentation additions that will cover the following:
   1. The new metrics information, including name, unit, attributes, and type of instrument.
   2. How to use. Not many people are familiar with OpenTelemetry:
      - Linking to appropriate places for configuration. Maybe a short description of the possibilities?
      - Explanation that there are existing bridges if they are using Dropwizard or Micrometer.
      - Last resort: Define your MetricReader to patch into your own.
   
      All of those, in my opinion, should be in the documentation.



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java:
##########
@@ -386,7 +402,30 @@ protected ConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurat
 
         topicNameWithoutPartition = topicName.getPartitionedTopicName();
 
+        InstrumentProvider ip = client.instrumentProvider();
+        Attributes attrs = ip.getAttributes(topic);
+        consumersOpenedCounter = ip.newCounter("pulsar.client.session.opened", Unit.Sessions,
+                "Counter of sessions opened", attrs.toBuilder().put("type", "consumer").build());
+        consumersClosedCounter = ip.newCounter("pulsar.client.session.closed", Unit.Sessions,
+                "Counter of sessions closed", attrs.toBuilder().put("type", "consumer").build());
+        messagesReceivedCounter = ip.newCounter("pulsar.client.received", Unit.Messages,

Review Comment:
   1. Why at the client level and not at the topic level? Having that per (broker, namespace, topic) makes sense, no?
   
   Now I get that there was that line that filled the attribute. Again, I'm all for having it explicitly to make it easy to capture when you read the code finding a certain metric.



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java:
##########
@@ -121,6 +123,18 @@ public ClientBuilder authentication(Authentication authentication) {
         return this;
     }
 
+    @Override
+    public ClientBuilder openTelemetry(OpenTelemetry openTelemetry) {
+        conf.setOpenTelemetry(openTelemetry);

Review Comment:
   Check later if there is a null check inside



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java:
##########
@@ -258,11 +270,17 @@ public ClientCnx(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, in
         this.idleState = new ClientCnxIdleState(this);
         this.clientVersion = "Pulsar-Java-v" + PulsarVersion.getVersion()
                 + (conf.getDescription() == null ? "" : ("-" + conf.getDescription()));
+        this.connectionsOpenedCounter = instrumentProvider.newCounter("pulsar.client.connections.opened", Unit.Connections,
+                "Counter of connections opened", Attributes.empty());

Review Comment:
   For description, how about "The number of connections opened"?



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java:
##########
@@ -84,6 +92,21 @@ public BinaryProtoLookupService(PulsarClientImpl client,
         this.serviceNameResolver = new PulsarServiceNameResolver();
         this.listenerName = listenerName;
         updateServiceUrl(serviceUrl);
+
+        Attributes attrs = Attributes.of(AttributeKey.stringKey("transport-type"), "binary");
+
+        histoGetBroker = client.instrumentProvider().newLatencyHistogram("pulsar.client.lookup",

Review Comment:
   The instrument is the duration of a lookup request? If so it should suffixed with `.duration` ==> `pulsar.client.lookup.duration`



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java:
##########
@@ -386,7 +402,30 @@ protected ConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurat
 
         topicNameWithoutPartition = topicName.getPartitionedTopicName();
 
+        InstrumentProvider ip = client.instrumentProvider();
+        Attributes attrs = ip.getAttributes(topic);
+        consumersOpenedCounter = ip.newCounter("pulsar.client.session.opened", Unit.Sessions,
+                "Counter of sessions opened", attrs.toBuilder().put("type", "consumer").build());
+        consumersClosedCounter = ip.newCounter("pulsar.client.session.closed", Unit.Sessions,
+                "Counter of sessions closed", attrs.toBuilder().put("type", "consumer").build());
+        messagesReceivedCounter = ip.newCounter("pulsar.client.received", Unit.Messages,
+                "Number of messages received", attrs);
+        bytesReceivedCounter = ip.newCounter("pulsar.client.received", Unit.Bytes,

Review Comment:
   Both bytes and messages have used the same instrument name.
   
   How about?
   - `pulsar.client.consumer.message.received.count`
   - `pulsar.client.consumer.message.received.size`
   



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java:
##########
@@ -386,7 +402,30 @@ protected ConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurat
 
         topicNameWithoutPartition = topicName.getPartitionedTopicName();
 
+        InstrumentProvider ip = client.instrumentProvider();
+        Attributes attrs = ip.getAttributes(topic);
+        consumersOpenedCounter = ip.newCounter("pulsar.client.session.opened", Unit.Sessions,
+                "Counter of sessions opened", attrs.toBuilder().put("type", "consumer").build());
+        consumersClosedCounter = ip.newCounter("pulsar.client.session.closed", Unit.Sessions,
+                "Counter of sessions closed", attrs.toBuilder().put("type", "consumer").build());
+        messagesReceivedCounter = ip.newCounter("pulsar.client.received", Unit.Messages,
+                "Number of messages received", attrs);
+        bytesReceivedCounter = ip.newCounter("pulsar.client.received", Unit.Bytes,
+                "Bytes received", attrs);
+        messagesPrefetchedGauge = ip.newUpDownCounter("pulsar.client.consumer.preteched.messages", Unit.Messages,
+                "Number of messages currently sitting in the consumer pre-fetch queue", attrs);
+        bytesPrefetchedGauge = ip.newUpDownCounter("pulsar.client.consumer.preteched", Unit.Bytes,
+                "Total number of bytes currently sitting in the consumer pre-fetch queue", attrs);
+
+        consumerAcksCounter = ip.newCounter("pulsar.client.consumer.ack", Unit.Messages,

Review Comment:
   1. The unit is Messages but the instrument name contains operations.
   2. How about:
      - `pulsar.client.consumer.message.ack`
      - `pulsar.client.consumer.message.nack`
   



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java:
##########
@@ -1228,7 +1228,10 @@ public int getTotalIncomingMessages() {
 
     protected void clearIncomingMessages() {
         // release messages if they are pooled messages
-        incomingMessages.forEach(Message::release);
+        incomingMessages.forEach(msg -> {

Review Comment:
   Why expand?



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java:
##########
@@ -268,6 +283,23 @@ public ProducerImpl(PulsarClientImpl client, String topic, ProducerConfiguration
             metadata = Collections.unmodifiableMap(new HashMap<>(conf.getProperties()));
         }
 
+        InstrumentProvider ip = client.instrumentProvider();
+        Attributes attrs = ip.getAttributes(topic);
+        latencyHistogram = ip.newLatencyHistogram("pulsar.client.producer.latency",
+                "Publish latency experienced by the application, includes client batching time", attrs);
+        rpcLatencyHistogram = ip.newLatencyHistogram("pulsar.client.producer.rpc.latency",
+                "Publish RPC latency experienced internally by the client when sending data to receiving an ack", attrs);
+        publishedBytesCounter = ip.newCounter("pulsar.client.producer.published",
+                Unit.Bytes, "Bytes published", attrs);
+        pendingMessagesCounter = ip.newUpDownCounter("pulsar.client.producer.pending.messages.count", Unit.Messages,

Review Comment:
   I suggest: 
   - `pulsar.client.producer.message.pending.count`
   - `pulsar.client.producer.message.pending.size`



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java:
##########
@@ -84,6 +92,21 @@ public BinaryProtoLookupService(PulsarClientImpl client,
         this.serviceNameResolver = new PulsarServiceNameResolver();
         this.listenerName = listenerName;
         updateServiceUrl(serviceUrl);
+
+        Attributes attrs = Attributes.of(AttributeKey.stringKey("transport-type"), "binary");
+
+        histoGetBroker = client.instrumentProvider().newLatencyHistogram("pulsar.client.lookup",

Review Comment:
   Note: Should it be `pulsar.client.operations.lookup` as in: do prefix with `operations` since we'll have more of those down below?
   



##########
pulsar-broker/src/test/java/org/apache/pulsar/client/api/PulsarMultiListenersWithInternalListenerNameTest.java:
##########
@@ -137,7 +139,7 @@ private void doFindBrokerWithListenerName(boolean useHttp) throws Exception {
         conf.setMaxLookupRedirects(10);
 
         @Cleanup
-        LookupService lookupService = useHttp ? new HttpLookupService(conf, eventExecutors) :
+        LookupService lookupService = useHttp ? new HttpLookupService(new InstrumentProvider(new ClientConfigurationData()), conf, eventExecutors) :

Review Comment:
   Check what is InstrumentProvider



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java:
##########
@@ -258,11 +270,17 @@ public ClientCnx(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, in
         this.idleState = new ClientCnxIdleState(this);
         this.clientVersion = "Pulsar-Java-v" + PulsarVersion.getVersion()
                 + (conf.getDescription() == null ? "" : ("-" + conf.getDescription()));
+        this.connectionsOpenedCounter = instrumentProvider.newCounter("pulsar.client.connections.opened", Unit.Connections,
+                "Counter of connections opened", Attributes.empty());
+        this.connectionsClosedCounter = instrumentProvider.newCounter("pulsar.client.connections.closed", Unit.Connections,

Review Comment:
   Shouldn't we include target details? Perhaps Cluster or Broker?



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java:
##########
@@ -386,7 +402,30 @@ protected ConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurat
 
         topicNameWithoutPartition = topicName.getPartitionedTopicName();
 
+        InstrumentProvider ip = client.instrumentProvider();
+        Attributes attrs = ip.getAttributes(topic);
+        consumersOpenedCounter = ip.newCounter("pulsar.client.session.opened", Unit.Sessions,
+                "Counter of sessions opened", attrs.toBuilder().put("type", "consumer").build());
+        consumersClosedCounter = ip.newCounter("pulsar.client.session.closed", Unit.Sessions,
+                "Counter of sessions closed", attrs.toBuilder().put("type", "consumer").build());
+        messagesReceivedCounter = ip.newCounter("pulsar.client.received", Unit.Messages,
+                "Number of messages received", attrs);
+        bytesReceivedCounter = ip.newCounter("pulsar.client.received", Unit.Bytes,
+                "Bytes received", attrs);
+        messagesPrefetchedGauge = ip.newUpDownCounter("pulsar.client.consumer.preteched.messages", Unit.Messages,

Review Comment:
   "preteched" --> "prefetched"
   
   How about?
   - `pulsar.client.consumer.message.prefetched.count`
   - `pulsar.client.consumer.message.prefetched.size`



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java:
##########
@@ -84,6 +92,21 @@ public BinaryProtoLookupService(PulsarClientImpl client,
         this.serviceNameResolver = new PulsarServiceNameResolver();
         this.listenerName = listenerName;
         updateServiceUrl(serviceUrl);
+
+        Attributes attrs = Attributes.of(AttributeKey.stringKey("transport-type"), "binary");
+
+        histoGetBroker = client.instrumentProvider().newLatencyHistogram("pulsar.client.lookup",
+                "Lookup operations",
+                attrs.toBuilder().put("type", "topic").build());
+        histoGetTopicMetadata = client.instrumentProvider().newLatencyHistogram("pulsar.client.lookup",
+                "Lookup operations",

Review Comment:
   How about "Duration of lookup operations"?
   (I have a comment below asking how can this be, since it seems those are durations for 4 different type of binary commands).



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/metrics/InstrumentProvider.java:
##########
@@ -0,0 +1,59 @@
+package org.apache.pulsar.client.impl.metrics;
+
+import io.opentelemetry.api.OpenTelemetry;
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.api.common.AttributesBuilder;
+import io.opentelemetry.api.metrics.Meter;
+import org.apache.pulsar.client.api.MetricsCardinality;
+import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.apache.pulsar.common.naming.TopicName;
+
+public class InstrumentProvider {
+
+    private final Meter meter;
+    private final MetricsCardinality metricsCardinality;
+
+    public InstrumentProvider(ClientConfigurationData conf) {
+        this.meter = conf.getOpenTelemetry().getMeter("org.apache.pulsar.client");
+        this.metricsCardinality = conf.getOpenTelemetryMetricsCardinality();
+    }
+
+    public Attributes getAttributes(String topic) {
+        if (metricsCardinality == MetricsCardinality.None) {
+            return Attributes.empty();
+        }
+
+        AttributesBuilder ab = Attributes.builder();
+        TopicName tn = TopicName.get(topic);
+
+        switch (metricsCardinality) {
+            case Partition:
+                if (tn.isPartitioned()) {
+                    ab.put("partition", tn.getPartitionIndex());
+                }
+                // fallthrough
+            case Topic:
+                ab.put("topic", tn.getPartitionedTopicName());
+                // fallthrough
+            case Namespace:
+                ab.put("namespace", tn.getNamespace());
+                // fallthrough
+            case Tenant:
+                ab.put("tenant", tn.getTenant());

Review Comment:
   `pulsar.tenant`



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java:
##########
@@ -84,6 +92,21 @@ public BinaryProtoLookupService(PulsarClientImpl client,
         this.serviceNameResolver = new PulsarServiceNameResolver();
         this.listenerName = listenerName;
         updateServiceUrl(serviceUrl);
+
+        Attributes attrs = Attributes.of(AttributeKey.stringKey("transport-type"), "binary");
+
+        histoGetBroker = client.instrumentProvider().newLatencyHistogram("pulsar.client.lookup",
+                "Lookup operations",
+                attrs.toBuilder().put("type", "topic").build());

Review Comment:
   When I look at the code, it doesn't look like all those 4 cases are launching a `/lookup` request. They map to different binary protocol commands.  If that is the case, why prefix name the instrument `lookup`? 



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java:
##########
@@ -239,10 +246,15 @@ String getDescription() {
 
 
     public ClientCnx(ClientConfigurationData conf, EventLoopGroup eventLoopGroup) {
-        this(conf, eventLoopGroup, Commands.getCurrentProtocolVersion());
+        this(new InstrumentProvider(conf), conf, eventLoopGroup, Commands.getCurrentProtocolVersion());

Review Comment:
   Continue the comment above about wrappers.
   Given we use Meter directly, can we create `PulsarClientOpenTelemetry` once when the client is built, and in there instantiate `PulsarClientOpenTelemetry`, which will create in it the Meter once, and then all classes will call `pulsarClientOpenTelemetry.getMeter()`?



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java:
##########
@@ -84,6 +92,21 @@ public BinaryProtoLookupService(PulsarClientImpl client,
         this.serviceNameResolver = new PulsarServiceNameResolver();
         this.listenerName = listenerName;
         updateServiceUrl(serviceUrl);
+
+        Attributes attrs = Attributes.of(AttributeKey.stringKey("transport-type"), "binary");
+
+        histoGetBroker = client.instrumentProvider().newLatencyHistogram("pulsar.client.lookup",
+                "Lookup operations",
+                attrs.toBuilder().put("type", "topic").build());
+        histoGetTopicMetadata = client.instrumentProvider().newLatencyHistogram("pulsar.client.lookup",
+                "Lookup operations",
+                attrs.toBuilder().put("type", "metadata").build());

Review Comment:
   Attributes can be mixed from different sources: Kubernetes (pod, namespace), Pulsar Client, etc. I believe pulsar should be prefixed. In this example, `pulsar.lookup.type` instead of `type`.



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/metrics/InstrumentProvider.java:
##########
@@ -0,0 +1,59 @@
+package org.apache.pulsar.client.impl.metrics;
+
+import io.opentelemetry.api.OpenTelemetry;
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.api.common.AttributesBuilder;
+import io.opentelemetry.api.metrics.Meter;
+import org.apache.pulsar.client.api.MetricsCardinality;
+import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.apache.pulsar.common.naming.TopicName;
+
+public class InstrumentProvider {
+
+    private final Meter meter;
+    private final MetricsCardinality metricsCardinality;
+
+    public InstrumentProvider(ClientConfigurationData conf) {
+        this.meter = conf.getOpenTelemetry().getMeter("org.apache.pulsar.client");
+        this.metricsCardinality = conf.getOpenTelemetryMetricsCardinality();
+    }
+
+    public Attributes getAttributes(String topic) {
+        if (metricsCardinality == MetricsCardinality.None) {
+            return Attributes.empty();
+        }
+
+        AttributesBuilder ab = Attributes.builder();
+        TopicName tn = TopicName.get(topic);
+
+        switch (metricsCardinality) {
+            case Partition:
+                if (tn.isPartitioned()) {
+                    ab.put("partition", tn.getPartitionIndex());
+                }
+                // fallthrough
+            case Topic:
+                ab.put("topic", tn.getPartitionedTopicName());
+                // fallthrough
+            case Namespace:
+                ab.put("namespace", tn.getNamespace());

Review Comment:
   `pulsar.namespace`



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java:
##########
@@ -146,6 +156,12 @@ public ConnectionPool(ClientConfigurationData conf, EventLoopGroup eventLoopGrou
                 }
             }, idleDetectionIntervalSeconds, idleDetectionIntervalSeconds, TimeUnit.SECONDS);
         }
+
+        connectionsTcpFailureCounter = instrumentProvider.newCounter("pulsar.client.connections.failed", Unit.None,
+                "Counter of failed connections", Attributes.builder().put("type", "tcp-failed").build());

Review Comment:
   I suggest `type` --> `pulsar.failure.type`



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java:
##########
@@ -84,6 +92,21 @@ public BinaryProtoLookupService(PulsarClientImpl client,
         this.serviceNameResolver = new PulsarServiceNameResolver();
         this.listenerName = listenerName;
         updateServiceUrl(serviceUrl);
+
+        Attributes attrs = Attributes.of(AttributeKey.stringKey("transport-type"), "binary");
+
+        histoGetBroker = client.instrumentProvider().newLatencyHistogram("pulsar.client.lookup",

Review Comment:
   Continue my comment above: In my opinion, we should stick to as simple as possible usage of OTel API, without any wrappers. 
   They are less readable - constructors are harder to reason about compared with builders. 
   They hide implementation details which forces me to read the internal anyway, so it doesn't help reability. When do I read those metrics? A lot of time, as a user or developer, I search the code for metric name , and then try to figure out the attributes. When I split the code composing the attributes for example, into some here and some hidden in the wrapper implementation, it makes it hard to compose the list of attributes. You are forced to grab a notepad and start writing how everything works, between classes and only then you know the attributes. Same goes for behavior - I don't know what it means LatencyHistogram - I know Histogram from OTel. It forces me to read it.
   
   In this specific class, having a single histogram instrument, and multiple `Attributes` and then use them together: `histogram.record(getSchemaSucessAttributes, value)` is much simpler, where
   ```
   Attributes getSchemaSuccess = Attributes.builder().
       .put("type", "schema")
       .put("response", "success")
       .build();
   ```
   



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java:
##########
@@ -121,12 +152,19 @@ public CompletableFuture<LookupTopicResult> getBroker(TopicName topicName) {
      *
      */
     public CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMetadata(TopicName topicName) {
+        long startTime = System.nanoTime();
         final MutableObject<CompletableFuture> newFutureCreated = new MutableObject<>();
         try {
             return partitionedMetadataInProgress.computeIfAbsent(topicName, tpName -> {
                 CompletableFuture<PartitionedTopicMetadata> newFuture =
                         getPartitionedTopicMetadata(serviceNameResolver.resolveHost(), topicName);
                 newFutureCreated.setValue(newFuture);
+                newFuture.thenRun(() -> {
+                    histoGetBroker.recordSuccess(System.nanoTime() - startTime);

Review Comment:
   Why not `histoGetTopicMetadata`?



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java:
##########
@@ -146,6 +156,12 @@ public ConnectionPool(ClientConfigurationData conf, EventLoopGroup eventLoopGrou
                 }
             }, idleDetectionIntervalSeconds, idleDetectionIntervalSeconds, TimeUnit.SECONDS);
         }
+
+        connectionsTcpFailureCounter = instrumentProvider.newCounter("pulsar.client.connections.failed", Unit.None,
+                "Counter of failed connections", Attributes.builder().put("type", "tcp-failed").build());

Review Comment:
   Suggested description: "The number of failed connection attempts"



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java:
##########
@@ -386,7 +402,30 @@ protected ConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurat
 
         topicNameWithoutPartition = topicName.getPartitionedTopicName();
 
+        InstrumentProvider ip = client.instrumentProvider();
+        Attributes attrs = ip.getAttributes(topic);
+        consumersOpenedCounter = ip.newCounter("pulsar.client.session.opened", Unit.Sessions,
+                "Counter of sessions opened", attrs.toBuilder().put("type", "consumer").build());
+        consumersClosedCounter = ip.newCounter("pulsar.client.session.closed", Unit.Sessions,
+                "Counter of sessions closed", attrs.toBuilder().put("type", "consumer").build());
+        messagesReceivedCounter = ip.newCounter("pulsar.client.received", Unit.Messages,
+                "Number of messages received", attrs);

Review Comment:
   "Number of messages received" --> "The number of messages explicitly received by the consumer - i.e., called receive() or batchReceive()."



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java:
##########
@@ -146,6 +156,12 @@ public ConnectionPool(ClientConfigurationData conf, EventLoopGroup eventLoopGrou
                 }
             }, idleDetectionIntervalSeconds, idleDetectionIntervalSeconds, TimeUnit.SECONDS);
         }
+
+        connectionsTcpFailureCounter = instrumentProvider.newCounter("pulsar.client.connections.failed", Unit.None,

Review Comment:
   The unit should be `{connection}` according to https://opentelemetry.io/docs/specs/semconv/general/metrics/#instrument-units
   This comment applies to all `pulsar.client.connection.*` counters



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java:
##########
@@ -114,6 +121,12 @@ public UnAckedMessageTracker(PulsarClientImpl client, ConsumerBase<?> consumerBa
         ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
         this.readLock = readWriteLock.readLock();
         this.writeLock = readWriteLock.writeLock();
+
+        InstrumentProvider ip = client.instrumentProvider();
+        Attributes attrs = ip.getAttributes(consumerBase.getTopic());
+        consumerAckTimeoutsCounter = ip.newCounter("pulsar.client.consumer.ack.timeout", Unit.Messages,

Review Comment:
   In the description, we count events, but the unit is messages. Are we counting messages that their acknowledgment request have timed out ? I'm not sure I understand this metric.
   If messages is counted then maybe we use `pulsar.client.consumer.message.ack.timeout`
   



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java:
##########
@@ -386,7 +402,30 @@ protected ConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurat
 
         topicNameWithoutPartition = topicName.getPartitionedTopicName();
 
+        InstrumentProvider ip = client.instrumentProvider();
+        Attributes attrs = ip.getAttributes(topic);
+        consumersOpenedCounter = ip.newCounter("pulsar.client.session.opened", Unit.Sessions,

Review Comment:
   The variable is about how many consumers were opened, but we count sessions in the unit and name. What is the definition of a session?



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/metrics/InstrumentProvider.java:
##########
@@ -0,0 +1,59 @@
+package org.apache.pulsar.client.impl.metrics;
+
+import io.opentelemetry.api.OpenTelemetry;
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.api.common.AttributesBuilder;
+import io.opentelemetry.api.metrics.Meter;
+import org.apache.pulsar.client.api.MetricsCardinality;
+import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.apache.pulsar.common.naming.TopicName;
+
+public class InstrumentProvider {
+
+    private final Meter meter;
+    private final MetricsCardinality metricsCardinality;
+
+    public InstrumentProvider(ClientConfigurationData conf) {
+        this.meter = conf.getOpenTelemetry().getMeter("org.apache.pulsar.client");
+        this.metricsCardinality = conf.getOpenTelemetryMetricsCardinality();
+    }
+
+    public Attributes getAttributes(String topic) {
+        if (metricsCardinality == MetricsCardinality.None) {
+            return Attributes.empty();
+        }
+
+        AttributesBuilder ab = Attributes.builder();
+        TopicName tn = TopicName.get(topic);
+
+        switch (metricsCardinality) {
+            case Partition:
+                if (tn.isPartitioned()) {
+                    ab.put("partition", tn.getPartitionIndex());
+                }
+                // fallthrough
+            case Topic:
+                ab.put("topic", tn.getPartitionedTopicName());

Review Comment:
   `pulsar.topic`



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java:
##########
@@ -268,6 +283,23 @@ public ProducerImpl(PulsarClientImpl client, String topic, ProducerConfiguration
             metadata = Collections.unmodifiableMap(new HashMap<>(conf.getProperties()));
         }
 
+        InstrumentProvider ip = client.instrumentProvider();
+        Attributes attrs = ip.getAttributes(topic);
+        latencyHistogram = ip.newLatencyHistogram("pulsar.client.producer.latency",
+                "Publish latency experienced by the application, includes client batching time", attrs);
+        rpcLatencyHistogram = ip.newLatencyHistogram("pulsar.client.producer.rpc.latency",
+                "Publish RPC latency experienced internally by the client when sending data to receiving an ack", attrs);
+        publishedBytesCounter = ip.newCounter("pulsar.client.producer.published",

Review Comment:
   I suggest `pulsar.client.producer.rpc.send.size`



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java:
##########
@@ -395,6 +397,11 @@ public class ClientConfigurationData implements Serializable, Cloneable {
     )
     private String description;
 
+
+    private transient OpenTelemetry openTelemetry;

Review Comment:
   1. Transient since we're serializing this class?
   2. How can a configuration class which should be a data class, contain an instance? Why isn't this saved at the builder level?



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/metrics/InstrumentProvider.java:
##########
@@ -0,0 +1,59 @@
+package org.apache.pulsar.client.impl.metrics;
+
+import io.opentelemetry.api.OpenTelemetry;
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.api.common.AttributesBuilder;
+import io.opentelemetry.api.metrics.Meter;
+import org.apache.pulsar.client.api.MetricsCardinality;
+import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.apache.pulsar.common.naming.TopicName;
+
+public class InstrumentProvider {
+
+    private final Meter meter;
+    private final MetricsCardinality metricsCardinality;
+
+    public InstrumentProvider(ClientConfigurationData conf) {
+        this.meter = conf.getOpenTelemetry().getMeter("org.apache.pulsar.client");

Review Comment:
   Since Pulsar's client is a library, as opposed to Pulsar Broker, we should also add the version to the meter. See: https://opentelemetry.io/docs/concepts/instrumentation/code-based/#configure-the-opentelemetry-api
   



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java:
##########
@@ -258,11 +270,17 @@ public ClientCnx(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, in
         this.idleState = new ClientCnxIdleState(this);
         this.clientVersion = "Pulsar-Java-v" + PulsarVersion.getVersion()
                 + (conf.getDescription() == null ? "" : ("-" + conf.getDescription()));
+        this.connectionsOpenedCounter = instrumentProvider.newCounter("pulsar.client.connections.opened", Unit.Connections,

Review Comment:
   I asked in OTel Slack since it contradicts their guidelines, but I don't have anything better. I'll get back to you on that.



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java:
##########
@@ -386,7 +402,30 @@ protected ConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurat
 
         topicNameWithoutPartition = topicName.getPartitionedTopicName();
 
+        InstrumentProvider ip = client.instrumentProvider();
+        Attributes attrs = ip.getAttributes(topic);
+        consumersOpenedCounter = ip.newCounter("pulsar.client.session.opened", Unit.Sessions,
+                "Counter of sessions opened", attrs.toBuilder().put("type", "consumer").build());
+        consumersClosedCounter = ip.newCounter("pulsar.client.session.closed", Unit.Sessions,
+                "Counter of sessions closed", attrs.toBuilder().put("type", "consumer").build());
+        messagesReceivedCounter = ip.newCounter("pulsar.client.received", Unit.Messages,
+                "Number of messages received", attrs);
+        bytesReceivedCounter = ip.newCounter("pulsar.client.received", Unit.Bytes,
+                "Bytes received", attrs);
+        messagesPrefetchedGauge = ip.newUpDownCounter("pulsar.client.consumer.preteched.messages", Unit.Messages,
+                "Number of messages currently sitting in the consumer pre-fetch queue", attrs);
+        bytesPrefetchedGauge = ip.newUpDownCounter("pulsar.client.consumer.preteched", Unit.Bytes,
+                "Total number of bytes currently sitting in the consumer pre-fetch queue", attrs);
+
+        consumerAcksCounter = ip.newCounter("pulsar.client.consumer.ack", Unit.Messages,
+                "Number of ack operations", attrs);

Review Comment:
   How about "The number of acknowledged messages"?



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java:
##########
@@ -268,6 +283,23 @@ public ProducerImpl(PulsarClientImpl client, String topic, ProducerConfiguration
             metadata = Collections.unmodifiableMap(new HashMap<>(conf.getProperties()));
         }
 
+        InstrumentProvider ip = client.instrumentProvider();
+        Attributes attrs = ip.getAttributes(topic);
+        latencyHistogram = ip.newLatencyHistogram("pulsar.client.producer.latency",
+                "Publish latency experienced by the application, includes client batching time", attrs);
+        rpcLatencyHistogram = ip.newLatencyHistogram("pulsar.client.producer.rpc.latency",

Review Comment:
   I suggest `pulsar.client.producer.send.duration`. Or if we wish to use rpc then `pulsar.client.publish.rpc.send.duration`



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java:
##########
@@ -268,6 +283,23 @@ public ProducerImpl(PulsarClientImpl client, String topic, ProducerConfiguration
             metadata = Collections.unmodifiableMap(new HashMap<>(conf.getProperties()));
         }
 
+        InstrumentProvider ip = client.instrumentProvider();
+        Attributes attrs = ip.getAttributes(topic);
+        latencyHistogram = ip.newLatencyHistogram("pulsar.client.producer.latency",

Review Comment:
   1. Latency is not used in places I checked in OTel. 
   2. The term "publish" does not appear in `Producer.java`; hence, I think it's better to stick with "send". It also aligns well with Consumer "receive"
   
   I suggest 
   `pulsar.client.producer.message.send.duration`
   



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java:
##########
@@ -386,7 +402,30 @@ protected ConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurat
 
         topicNameWithoutPartition = topicName.getPartitionedTopicName();
 
+        InstrumentProvider ip = client.instrumentProvider();
+        Attributes attrs = ip.getAttributes(topic);
+        consumersOpenedCounter = ip.newCounter("pulsar.client.session.opened", Unit.Sessions,
+                "Counter of sessions opened", attrs.toBuilder().put("type", "consumer").build());
+        consumersClosedCounter = ip.newCounter("pulsar.client.session.closed", Unit.Sessions,
+                "Counter of sessions closed", attrs.toBuilder().put("type", "consumer").build());
+        messagesReceivedCounter = ip.newCounter("pulsar.client.received", Unit.Messages,
+                "Number of messages received", attrs);
+        bytesReceivedCounter = ip.newCounter("pulsar.client.received", Unit.Bytes,
+                "Bytes received", attrs);

Review Comment:
   Should be like the previous description but about bytes,



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java:
##########
@@ -60,11 +64,31 @@ public class HttpLookupService implements LookupService {
     private static final String BasePathV1 = "lookup/v2/destination/";
     private static final String BasePathV2 = "lookup/v2/topic/";
 
-    public HttpLookupService(ClientConfigurationData conf, EventLoopGroup eventLoopGroup)
+    private final LatencyHistogram histoGetBroker;
+    private final LatencyHistogram histoGetTopicMetadata;
+    private final LatencyHistogram histoGetSchema;
+    private final LatencyHistogram histoListTopics;
+
+    public HttpLookupService(InstrumentProvider instrumentProvider, ClientConfigurationData conf, EventLoopGroup eventLoopGroup)
             throws PulsarClientException {
         this.httpClient = new HttpClient(conf, eventLoopGroup);
         this.useTls = conf.isUseTls();
         this.listenerName = conf.getListenerName();
+
+        Attributes attrs = Attributes.of(AttributeKey.stringKey("transport-type"), "binary");
+
+        histoGetBroker = instrumentProvider.newLatencyHistogram("pulsar.client.lookup",

Review Comment:
   Although OpenTelemetry supports creating the same instrument name, I think a metric class dedicated to this should be created and used in both binary and HTTP lookup services. It's less confusing for the reader and especially the "searcher".
   



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java:
##########
@@ -268,6 +283,23 @@ public ProducerImpl(PulsarClientImpl client, String topic, ProducerConfiguration
             metadata = Collections.unmodifiableMap(new HashMap<>(conf.getProperties()));
         }
 
+        InstrumentProvider ip = client.instrumentProvider();
+        Attributes attrs = ip.getAttributes(topic);
+        latencyHistogram = ip.newLatencyHistogram("pulsar.client.producer.latency",
+                "Publish latency experienced by the application, includes client batching time", attrs);
+        rpcLatencyHistogram = ip.newLatencyHistogram("pulsar.client.producer.rpc.latency",
+                "Publish RPC latency experienced internally by the client when sending data to receiving an ack", attrs);
+        publishedBytesCounter = ip.newCounter("pulsar.client.producer.published",
+                Unit.Bytes, "Bytes published", attrs);
+        pendingMessagesCounter = ip.newUpDownCounter("pulsar.client.producer.pending.messages.count", Unit.Messages,
+                "Pending messages for this producer", attrs);
+        pendingBytesCounter = ip.newUpDownCounter("pulsar.client.producer.pending.count", Unit.Bytes,
+                "Pending bytes for this producer", attrs);
+        producersOpenedCounter = ip.newCounter("pulsar.client.session.opened", Unit.Sessions,

Review Comment:
   Seeing the same metric defined twice (producer and consumer) is confusing. 
   Also, the rule of thumb is that aggregating all attributes across a single instrument has meaning. I'm not sure if it has meaning to sum producer and consumer sessions. Maybe each should have their own session?



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java:
##########
@@ -258,11 +270,17 @@ public ClientCnx(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, in
         this.idleState = new ClientCnxIdleState(this);
         this.clientVersion = "Pulsar-Java-v" + PulsarVersion.getVersion()
                 + (conf.getDescription() == null ? "" : ("-" + conf.getDescription()));
+        this.connectionsOpenedCounter = instrumentProvider.newCounter("pulsar.client.connections.opened", Unit.Connections,
+                "Counter of connections opened", Attributes.empty());

Review Comment:
   Same can be applied to closed below



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] WIP: PIP-342: OTel client metrics support [pulsar]

Posted by "merlimat (via GitHub)" <gi...@apache.org>.
merlimat commented on code in PR #22179:
URL: https://github.com/apache/pulsar/pull/22179#discussion_r1511533926


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java:
##########
@@ -1228,7 +1228,10 @@ public int getTotalIncomingMessages() {
 
     protected void clearIncomingMessages() {
         // release messages if they are pooled messages
-        incomingMessages.forEach(Message::release);
+        incomingMessages.forEach(msg -> {

Review Comment:
   Left over of an attempted change. Fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] WIP: PIP-342: OTel client metrics support [pulsar]

Posted by "merlimat (via GitHub)" <gi...@apache.org>.
merlimat commented on code in PR #22179:
URL: https://github.com/apache/pulsar/pull/22179#discussion_r1511771322


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java:
##########
@@ -386,7 +402,30 @@ protected ConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurat
 
         topicNameWithoutPartition = topicName.getPartitionedTopicName();
 
+        InstrumentProvider ip = client.instrumentProvider();
+        Attributes attrs = ip.getAttributes(topic);
+        consumersOpenedCounter = ip.newCounter("pulsar.client.session.opened", Unit.Sessions,
+                "Counter of sessions opened", attrs.toBuilder().put("type", "consumer").build());
+        consumersClosedCounter = ip.newCounter("pulsar.client.session.closed", Unit.Sessions,
+                "Counter of sessions closed", attrs.toBuilder().put("type", "consumer").build());
+        messagesReceivedCounter = ip.newCounter("pulsar.client.received", Unit.Messages,

Review Comment:
   I'm not sure I understand: the `attrs` are going to be at topic/namespace or whatever level is configured.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] WIP: PIP-342: OTel client metrics support [pulsar]

Posted by "merlimat (via GitHub)" <gi...@apache.org>.
merlimat commented on code in PR #22179:
URL: https://github.com/apache/pulsar/pull/22179#discussion_r1511759704


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/metrics/InstrumentProvider.java:
##########
@@ -0,0 +1,59 @@
+package org.apache.pulsar.client.impl.metrics;
+
+import io.opentelemetry.api.OpenTelemetry;
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.api.common.AttributesBuilder;
+import io.opentelemetry.api.metrics.Meter;
+import org.apache.pulsar.client.api.MetricsCardinality;
+import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.apache.pulsar.common.naming.TopicName;
+
+public class InstrumentProvider {
+
+    private final Meter meter;
+    private final MetricsCardinality metricsCardinality;
+
+    public InstrumentProvider(ClientConfigurationData conf) {
+        this.meter = conf.getOpenTelemetry().getMeter("org.apache.pulsar.client");

Review Comment:
   Is this using `MeterBuilder setInstrumentationVersion(String instrumentationScopeVersion);`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] WIP: PIP-342: OTel client metrics support [pulsar]

Posted by "merlimat (via GitHub)" <gi...@apache.org>.
merlimat commented on code in PR #22179:
URL: https://github.com/apache/pulsar/pull/22179#discussion_r1511842706


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java:
##########
@@ -84,6 +92,21 @@ public BinaryProtoLookupService(PulsarClientImpl client,
         this.serviceNameResolver = new PulsarServiceNameResolver();
         this.listenerName = listenerName;
         updateServiceUrl(serviceUrl);
+
+        Attributes attrs = Attributes.of(AttributeKey.stringKey("transport-type"), "binary");
+
+        histoGetBroker = client.instrumentProvider().newLatencyHistogram("pulsar.client.lookup",

Review Comment:
   Since the unit is `seconds`, the metric name gets change (at least in Prometheus exporter) to: 
   
   ```
   # HELP pulsar_client_lookup_seconds Duration of lookup operations
   # TYPE pulsar_client_lookup_seconds histogram
   pulsar_client_lookup_seconds_bucket{otel_scope_name="org.apache.pulsar.client",otel_scope_version="3.3.0-SNAPSHOT",pulsar_lookup_transport_type="binary",pulsar_lookup_type="metadata",success="true",le="5.0E-4"} 0
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] WIP: PIP-342: OTel client metrics support [pulsar]

Posted by "merlimat (via GitHub)" <gi...@apache.org>.
merlimat commented on code in PR #22179:
URL: https://github.com/apache/pulsar/pull/22179#discussion_r1511850172


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java:
##########
@@ -386,7 +402,30 @@ protected ConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurat
 
         topicNameWithoutPartition = topicName.getPartitionedTopicName();
 
+        InstrumentProvider ip = client.instrumentProvider();
+        Attributes attrs = ip.getAttributes(topic);
+        consumersOpenedCounter = ip.newCounter("pulsar.client.session.opened", Unit.Sessions,
+                "Counter of sessions opened", attrs.toBuilder().put("type", "consumer").build());
+        consumersClosedCounter = ip.newCounter("pulsar.client.session.closed", Unit.Sessions,
+                "Counter of sessions closed", attrs.toBuilder().put("type", "consumer").build());
+        messagesReceivedCounter = ip.newCounter("pulsar.client.received", Unit.Messages,
+                "Number of messages received", attrs);
+        bytesReceivedCounter = ip.newCounter("pulsar.client.received", Unit.Bytes,

Review Comment:
   The thing is that with the different unit, the metrics will end up with different names (In Prometheus exporter at least)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] WIP: PIP-342: OTel client metrics support [pulsar]

Posted by "asafm (via GitHub)" <gi...@apache.org>.
asafm commented on code in PR #22179:
URL: https://github.com/apache/pulsar/pull/22179#discussion_r1512318835


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java:
##########
@@ -84,6 +92,21 @@ public BinaryProtoLookupService(PulsarClientImpl client,
         this.serviceNameResolver = new PulsarServiceNameResolver();
         this.listenerName = listenerName;
         updateServiceUrl(serviceUrl);
+
+        Attributes attrs = Attributes.of(AttributeKey.stringKey("transport-type"), "binary");
+
+        histoGetBroker = client.instrumentProvider().newLatencyHistogram("pulsar.client.lookup",

Review Comment:
   Prometheus exporter implements their own rules conforming to OpenMetrics format I think. If you ship directly to DataDog using OTLP, they may convert with their own rules. You can also ship using OTLP to the collector and from there the Promeus converter there will do its job.
   I think it's important in OTel we keep the naming as they have prescribed.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [improve] WIP: PIP-342: OTel client metrics support [pulsar]

Posted by "asafm (via GitHub)" <gi...@apache.org>.
asafm commented on code in PR #22179:
URL: https://github.com/apache/pulsar/pull/22179#discussion_r1515712296


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java:
##########
@@ -258,11 +270,17 @@ public ClientCnx(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, in
         this.idleState = new ClientCnxIdleState(this);
         this.clientVersion = "Pulsar-Java-v" + PulsarVersion.getVersion()
                 + (conf.getDescription() == null ? "" : ("-" + conf.getDescription()));
+        this.connectionsOpenedCounter = instrumentProvider.newCounter("pulsar.client.connections.opened", Unit.Connections,

Review Comment:
   Technically, all prefixes should be single:` pulsar.client.connection.opened`. Only the metric name should be pluralized.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [improve] WIP: PIP-342: OTel client metrics support [pulsar]

Posted by "merlimat (via GitHub)" <gi...@apache.org>.
merlimat commented on code in PR #22179:
URL: https://github.com/apache/pulsar/pull/22179#discussion_r1520147586


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/metrics/InstrumentProvider.java:
##########
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.client.impl.metrics;
+
+import io.opentelemetry.api.GlobalOpenTelemetry;
+import io.opentelemetry.api.OpenTelemetry;
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.api.metrics.Meter;
+import java.util.Optional;
+import org.apache.pulsar.PulsarVersion;
+import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+
+public class InstrumentProvider {
+
+    public static final InstrumentProvider NOOP = new InstrumentProvider(OpenTelemetry.noop());
+
+    private final Meter meter;
+
+    public InstrumentProvider(OpenTelemetry otel) {
+        if (otel == null) {
+            // By default, metrics are disabled, unless the OTel java agent is configured.
+            // This allows to enable metrics without any code change.

Review Comment:
   There is always the possibility of weird issues with different classloaders. In general terms, though, it does work fine that if you enable the OTel agent, you will get Pulsar client metrics without passing the OpenTelemetry object when building the `PulsarClient` instance. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [improve] PIP-342: OTel client metrics support [pulsar]

Posted by "merlimat (via GitHub)" <gi...@apache.org>.
merlimat merged PR #22179:
URL: https://github.com/apache/pulsar/pull/22179


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org