You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/09/29 12:13:16 UTC

[GitHub] [pulsar] asafm commented on a diff in pull request #17072: [monitoring][broker][metadata_store] add metrics for BatchMetadataStore

asafm commented on code in PR #17072:
URL: https://github.com/apache/pulsar/pull/17072#discussion_r983441705


##########
pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/OpPut.java:
##########
@@ -49,4 +50,9 @@ public Type getType() {
     public int size() {
         return path.length() + (data != null ? data.length : 0);
     }
+
+    @Override

Review Comment:
   I'm not a big fan of inheritance, yet I think it might be good in this case to have this in a shared abstract class? 



##########
pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/stats/BatchMetadataStoreStats.java:
##########
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.metadata.impl.stats;
+
+import io.prometheus.client.Counter;
+import io.prometheus.client.Gauge;
+import io.prometheus.client.Histogram;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.pulsar.metadata.impl.batching.MetadataOp;
+import org.jctools.queues.MessagePassingQueue;
+
+public final class BatchMetadataStoreStats implements AutoCloseable {
+    private static final double[] BUCKETS = new double[]{1, 5, 10, 20, 50, 100, 200, 500, 1000};
+    private static final String NAME = "name";
+    private static final String TYPE = "type";
+    private static final String OP_READ = "read";
+    private static final String OP_WRITE = "write";
+
+    private static final Gauge QUEUEING_OPS = Gauge
+            .build("pulsar_batch_metadata_store_queueing_ops", "-")
+            .labelNames(NAME, TYPE)
+            .register();
+    private static final Gauge EXECUTOR_QUEUE_SIZE = Gauge
+            .build("pulsar_batch_metadata_store_executor_queue_size", "-")
+            .labelNames(NAME)
+            .register();
+    private static final Counter OVERFLOW_OPS = Counter
+            .build("pulsar_batch_metadata_store_overflow_ops" , "-")

Review Comment:
   This metric definitely needs a helpline explaining - Not everybody will understand you're counting the number of operations done directly, without a queue, because the queue was full.



##########
pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/stats/BatchMetadataStoreStats.java:
##########
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.metadata.impl.stats;
+
+import io.prometheus.client.Counter;
+import io.prometheus.client.Gauge;
+import io.prometheus.client.Histogram;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.pulsar.metadata.impl.batching.MetadataOp;
+import org.jctools.queues.MessagePassingQueue;
+
+public final class BatchMetadataStoreStats implements AutoCloseable {
+    private static final double[] BUCKETS = new double[]{1, 5, 10, 20, 50, 100, 200, 500, 1000};
+    private static final String NAME = "name";
+    private static final String TYPE = "type";
+    private static final String OP_READ = "read";
+    private static final String OP_WRITE = "write";
+
+    private static final Gauge QUEUEING_OPS = Gauge
+            .build("pulsar_batch_metadata_store_queueing_ops", "-")
+            .labelNames(NAME, TYPE)
+            .register();
+    private static final Gauge EXECUTOR_QUEUE_SIZE = Gauge
+            .build("pulsar_batch_metadata_store_executor_queue_size", "-")
+            .labelNames(NAME)
+            .register();
+    private static final Counter OVERFLOW_OPS = Counter
+            .build("pulsar_batch_metadata_store_overflow_ops" , "-")
+            .labelNames(NAME, TYPE)
+            .register();
+    private static final Histogram BATCH_OPS_WAITING = Histogram
+            .build("pulsar_batch_metadata_store_waiting", "-")

Review Comment:
   `waiting` --> `queue_wait_time`



##########
pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/stats/BatchMetadataStoreStats.java:
##########
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.metadata.impl.stats;
+
+import io.prometheus.client.Counter;
+import io.prometheus.client.Gauge;
+import io.prometheus.client.Histogram;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.pulsar.metadata.impl.batching.MetadataOp;
+import org.jctools.queues.MessagePassingQueue;
+
+public final class BatchMetadataStoreStats implements AutoCloseable {
+    private static final double[] BUCKETS = new double[]{1, 5, 10, 20, 50, 100, 200, 500, 1000};
+    private static final String NAME = "name";
+    private static final String TYPE = "type";
+    private static final String OP_READ = "read";
+    private static final String OP_WRITE = "write";
+
+    private static final Gauge QUEUEING_OPS = Gauge
+            .build("pulsar_batch_metadata_store_queueing_ops", "-")
+            .labelNames(NAME, TYPE)
+            .register();
+    private static final Gauge EXECUTOR_QUEUE_SIZE = Gauge

Review Comment:
   How frequently is this queue size changing? If you scraped these metrics once every 30sec or 1 min, and it's frequently changing, these metrics would not help you.
   
   Maybe define a histogram and sample the queue size every 1 second, and observe it?
   



##########
pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/AbstractBatchedMetadataStore.java:
##########
@@ -157,16 +162,29 @@ public Optional<MetadataEventSynchronizer> getMetadataEventSynchronizer() {
     private void enqueue(MessagePassingQueue<MetadataOp> queue, MetadataOp op) {
         if (enabled) {
             if (!queue.offer(op)) {
+                if (queue.equals(this.readOps)) {

Review Comment:
   I don't think comparison by reference is an elegant way of knowing the type of the op. 
   I would either:
   `if (isReadOp(op)` and implement it based on the enum value.
   Or pass a new enum called `MetadataOpType.READ` or `WRITE `to the function.
   



##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/MetadataStoreStatsTest.java:
##########
@@ -137,4 +137,108 @@ public void testMetadataStoreStats() throws Exception {
         }
     }
 
+    @Test
+    public void testBatchMetadataStoreMetrics() throws Exception {
+        String ns = "prop/ns-abc1";
+        admin.namespaces().createNamespace(ns);
+
+        String topic = "persistent://prop/ns-abc1/metadata-store-" + UUID.randomUUID();
+        String subName = "my-sub1";
+
+        @Cleanup
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+                .topic(topic).create();
+        @Cleanup
+        Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
+                .topic(topic).subscriptionName(subName).subscribe();
+
+        for (int i = 0; i < 100; i++) {
+            producer.newMessage().value(UUID.randomUUID().toString()).send();
+        }
+
+        for (;;) {
+            Message<String> message = consumer.receive(10, TimeUnit.SECONDS);
+            if (message == null) {
+                break;
+            }
+            consumer.acknowledge(message);
+        }
+
+        ByteArrayOutputStream output = new ByteArrayOutputStream();
+        PrometheusMetricsGenerator.generate(pulsar, false, false, false, false, output);
+        String metricsStr = output.toString();
+        Multimap<String, PrometheusMetricsTest.Metric> metricsMap = PrometheusMetricsTest.parseMetrics(metricsStr);
+
+        Collection<PrometheusMetricsTest.Metric> opsOverflow = metricsMap.get("pulsar_batch_metadata_store_overflow_ops" + "_total");
+        Collection<PrometheusMetricsTest.Metric> queueingOps = metricsMap.get("pulsar_batch_metadata_store_queueing_ops");
+        Collection<PrometheusMetricsTest.Metric> executorQueueSize = metricsMap.get("pulsar_batch_metadata_store_executor_queue_size");
+        Collection<PrometheusMetricsTest.Metric> opsWaiting = metricsMap.get("pulsar_batch_metadata_store_waiting_ms" + "_sum");
+
+        Assert.assertTrue(opsOverflow.size() > 0 && opsOverflow.size() % 2 == 0);
+        Assert.assertTrue(queueingOps.size() > 0 && queueingOps.size() % 2 == 0);
+        Assert.assertTrue(executorQueueSize.size() > 1);
+        Assert.assertTrue(opsWaiting.size() > 1);
+
+        int readOpsOverflow = 0;
+        int writeOpsOverflow = 0;
+        for (PrometheusMetricsTest.Metric m : opsOverflow) {
+            Assert.assertEquals(m.tags.get("cluster"), "test");
+            String metadataStoreName = m.tags.get("name");
+            Assert.assertNotNull(metadataStoreName);
+            Assert.assertTrue(metadataStoreName.equals(MetadataStoreConfig.METADATA_STORE)
+                    || metadataStoreName.equals(MetadataStoreConfig.CONFIGURATION_METADATA_STORE)
+                    || metadataStoreName.equals(MetadataStoreConfig.STATE_METADATA_STORE));
+            String opType = m.tags.get("type");
+            Assert.assertNotNull(opType);
+            if (opType.equals("read")) {
+                readOpsOverflow++;
+            } else if (opType.equals("write")){
+                writeOpsOverflow++;
+            }
+            Assert.assertTrue(m.value >= 0);
+        }
+        Assert.assertEquals(readOpsOverflow, writeOpsOverflow);
+        Assert.assertTrue(readOpsOverflow > 0);

Review Comment:
   Where did you set the queue size so you know for sure that 100 published messages will cause the queue to get full?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org