You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/06/21 10:12:33 UTC

[GitHub] [pulsar] eolivelli commented on a diff in pull request #15140: [monitor][txn] Add metrics for transaction

eolivelli commented on code in PR #15140:
URL: https://github.com/apache/pulsar/pull/15140#discussion_r902428158


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleStatsImpl.java:
##########
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.transaction.pendingack.impl;
+
+import io.prometheus.client.Counter;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.broker.transaction.pendingack.PendingAckHandleStats;
+import org.apache.pulsar.common.naming.TopicName;
+
+public class PendingAckHandleStatsImpl implements PendingAckHandleStats {
+    private static final AtomicBoolean INITIALIZED = new AtomicBoolean(false);
+    private static Counter commitTxnCounter;
+    private static Counter abortTxnCounter;
+    private static boolean exposeTopicLevelMetrics0;
+
+    private final String[] labelSucceed;
+    private final String[] labelFailed;
+
+    public PendingAckHandleStatsImpl(String topic, String subscription, boolean exposeTopicLevelMetrics) {
+        initialize(exposeTopicLevelMetrics);
+
+        String namespace;
+        if (StringUtils.isBlank(topic)) {
+            namespace = topic = "unknown";

Review Comment:
   do we have other metrics that report "unknown" as topic when it is not set ?
   I think that we should throw a error in this case, it is not possible to see a blank topic name here 



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferClientImpl.java:
##########
@@ -35,47 +37,88 @@
 public class TransactionBufferClientImpl implements TransactionBufferClient {
 
     private final TransactionBufferHandler tbHandler;
+    private final TransactionBufferClientStats stats;
 
-    private TransactionBufferClientImpl(TransactionBufferHandler tbHandler) {
+    private TransactionBufferClientImpl(TransactionBufferHandler tbHandler, boolean exposeTopicLevelMetrics,
+                                        boolean enableTxnCoordinator) {
         this.tbHandler = tbHandler;
+        this.stats = TransactionBufferClientStats.create(exposeTopicLevelMetrics, tbHandler, enableTxnCoordinator);
     }
 
     public static TransactionBufferClient create(PulsarService pulsarService, HashedWheelTimer timer,
         int maxConcurrentRequests, long operationTimeoutInMills) throws PulsarServerException {
         TransactionBufferHandler handler = new TransactionBufferHandlerImpl(pulsarService, timer,
                 maxConcurrentRequests, operationTimeoutInMills);
-        return new TransactionBufferClientImpl(handler);
+
+        ServiceConfiguration config = pulsarService.getConfig();
+        boolean exposeTopicLevelMetrics = config.isExposeTopicLevelMetricsInPrometheus();
+        boolean enableTxnCoordinator = config.isTransactionCoordinatorEnabled();
+        return new TransactionBufferClientImpl(handler, exposeTopicLevelMetrics, enableTxnCoordinator);
     }
 
     @Override
     public CompletableFuture<TxnID> commitTxnOnTopic(String topic, long txnIdMostBits,
                                                      long txnIdLeastBits, long lowWaterMark) {
-        return tbHandler.endTxnOnTopic(topic, txnIdMostBits, txnIdLeastBits, TxnAction.COMMIT, lowWaterMark);
+        long start = System.currentTimeMillis();

Review Comment:
   I think that we usually use "nanos", because millis will be usually 0 (hopefully)



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##########
@@ -1867,6 +1867,9 @@ public CompletableFuture<TopicStatsImpl> asyncGetStats(boolean getPreciseBacklog
         stats.bytesOutCounter = bytesOutFromRemovedSubscriptions.longValue();
         stats.msgOutCounter = msgOutFromRemovedSubscriptions.longValue();
         stats.publishRateLimitedTimes = publishRateLimitedTimes;
+        stats.ongoingTxnCount = getTransactionBuffer().getOngoingTxnCount();

Review Comment:
   nit: can we call getTransactionBuffer() only once ?



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferClientStatsImpl.java:
##########
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.transaction.buffer.impl;
+
+import io.prometheus.client.CollectorRegistry;
+import io.prometheus.client.Counter;
+import io.prometheus.client.Gauge;
+import io.prometheus.client.Summary;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.pulsar.broker.transaction.buffer.TransactionBufferClientStats;
+import org.apache.pulsar.client.impl.transaction.TransactionBufferHandler;
+import org.apache.pulsar.common.naming.TopicName;
+
+public final class TransactionBufferClientStatsImpl implements TransactionBufferClientStats {
+    private static final double[] QUANTILES = {0.50, 0.75, 0.95, 0.99, 0.999, 0.9999, 1};
+    private final AtomicBoolean closed = new AtomicBoolean(false);
+
+    private final Counter abortFailed;
+    private final Counter commitFailed;
+    private final Summary abortLatency;
+    private final Summary commitLatency;
+    private final Gauge pendingRequests;
+
+    private final boolean exposeTopicLevelMetrics;
+
+    private static TransactionBufferClientStats instance;
+
+    private TransactionBufferClientStatsImpl(boolean exposeTopicLevelMetrics,
+                                             TransactionBufferHandler handler) {
+        this.exposeTopicLevelMetrics = exposeTopicLevelMetrics;
+        String[] labelNames = exposeTopicLevelMetrics
+                ? new String[]{"namespace", "topic"} : new String[]{"namespace"};
+
+        this.abortFailed = Counter.build("pulsar_txn_tb_client_abort_failed", "-")
+                .labelNames(labelNames)
+                .register();

Review Comment:
   are we using the default global registry ?
   what happens if you run two brokers inside the same JVM ?
   is it a pattern that we follow for other metrics ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org