You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/06/21 12:47:48 UTC

[GitHub] [pulsar] nicoloboschi commented on a diff in pull request #15140: [monitor][txn] Add metrics for transaction

nicoloboschi commented on code in PR #15140:
URL: https://github.com/apache/pulsar/pull/15140#discussion_r902569967


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferClientStats.java:
##########
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.transaction.buffer;
+
+import org.apache.pulsar.broker.transaction.buffer.impl.TransactionBufferClientStatsImpl;
+import org.apache.pulsar.client.impl.transaction.TransactionBufferHandler;
+
+public interface TransactionBufferClientStats {
+
+    void recordAbortFailed(String topic);
+
+    void recordCommitFailed(String topic);
+
+    void recordAbortLatency(String topic, long cost);
+
+    void recordCommitLatency(String topic, long cost);
+
+    void close();

Review Comment:
   we can use `AutoCloseable` 



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java:
##########
@@ -245,6 +248,20 @@ public CompletableFuture<Void> checkIfTBRecoverCompletely(boolean isTxnEnabled)
         }
     }
 
+    @Override
+    public long getOngoingTxnCount() {

Review Comment:
   IIUC `ongoingTxns` and `aborts` are real gauges because they can be increased or decreased. 
   `txnCommittedCounter` is only incremented and so it's a counter. 
   Is it correct ? 
   If so, I believe we have to introduce a txnAbortedCounter



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckHandleStats.java:
##########
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.transaction.pendingack;
+
+import org.apache.pulsar.broker.transaction.pendingack.impl.PendingAckHandleStatsImpl;
+
+public interface PendingAckHandleStats {
+
+    void recordCommitTxn(boolean success);
+
+    void recordAbortTxn(boolean success);
+
+    void close();

Review Comment:
   Autocloseable? 



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferClientStatsImpl.java:
##########
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.transaction.buffer.impl;
+
+import io.prometheus.client.CollectorRegistry;
+import io.prometheus.client.Counter;
+import io.prometheus.client.Gauge;
+import io.prometheus.client.Summary;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.pulsar.broker.transaction.buffer.TransactionBufferClientStats;
+import org.apache.pulsar.client.impl.transaction.TransactionBufferHandler;
+import org.apache.pulsar.common.naming.TopicName;
+
+public final class TransactionBufferClientStatsImpl implements TransactionBufferClientStats {
+    private static final double[] QUANTILES = {0.50, 0.75, 0.95, 0.99, 0.999, 0.9999, 1};
+    private final AtomicBoolean closed = new AtomicBoolean(false);
+
+    private final Counter abortFailed;
+    private final Counter commitFailed;
+    private final Summary abortLatency;
+    private final Summary commitLatency;
+    private final Gauge pendingRequests;
+
+    private final boolean exposeTopicLevelMetrics;
+
+    private static TransactionBufferClientStats instance;
+
+    private TransactionBufferClientStatsImpl(boolean exposeTopicLevelMetrics,
+                                             TransactionBufferHandler handler) {
+        this.exposeTopicLevelMetrics = exposeTopicLevelMetrics;
+        String[] labelNames = exposeTopicLevelMetrics
+                ? new String[]{"namespace", "topic"} : new String[]{"namespace"};
+
+        this.abortFailed = Counter.build("pulsar_txn_tb_client_abort_failed", "-")
+                .labelNames(labelNames)
+                .register();
+        this.commitFailed = Counter.build("pulsar_txn_tb_client_commit_failed", "-")
+                .labelNames(labelNames)
+                .register();
+        this.abortLatency =
+                this.buildSummary("pulsar_txn_tb_client_abort_latency", "-", labelNames);
+        this.commitLatency =
+                this.buildSummary("pulsar_txn_tb_client_commit_latency", "-", labelNames);
+
+        this.pendingRequests = Gauge.build("pulsar_txn_tb_client_pending_requests", "-")
+                .register()
+                .setChild(new Gauge.Child() {
+                    @Override
+                    public double get() {
+                        return null == handler ? 0 : handler.getPendingRequestsCount();
+                    }
+                });
+    }
+
+    private Summary buildSummary(String name, String help, String[] labelNames) {
+        Summary.Builder builder = Summary.build(name, help)
+                .labelNames(labelNames);
+        for (double quantile : QUANTILES) {
+            builder.quantile(quantile, 0.01D);
+        }
+        return builder.register();
+    }
+
+    public static synchronized TransactionBufferClientStats getInstance(boolean exposeTopicLevelMetrics,
+                                                                        TransactionBufferHandler handler) {
+        if (null == instance) {
+            instance = new TransactionBufferClientStatsImpl(exposeTopicLevelMetrics, handler);
+        }
+
+        return instance;
+    }
+
+    @Override
+    public void recordAbortFailed(String topic) {
+        this.abortFailed.labels(labelValues(topic)).inc();
+    }
+
+    @Override
+    public void recordCommitFailed(String topic) {
+        this.commitFailed.labels(labelValues(topic)).inc();
+    }
+
+    @Override
+    public void recordAbortLatency(String topic, long cost) {
+        this.abortLatency.labels(labelValues(topic)).observe(cost);
+    }
+
+    @Override
+    public void recordCommitLatency(String topic, long cost) {
+        this.commitLatency.labels(labelValues(topic)).observe(cost);
+    }
+
+    private String[] labelValues(String topic) {
+        try {
+            TopicName topicName = TopicName.get(topic);
+            return exposeTopicLevelMetrics
+                    ? new String[]{topicName.getNamespace(), topic} : new String[]{topicName.getNamespace()};
+        } catch (Throwable t) {

Review Comment:
   log the exception ? 



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferClientStatsImpl.java:
##########
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.transaction.buffer.impl;
+
+import io.prometheus.client.CollectorRegistry;
+import io.prometheus.client.Counter;
+import io.prometheus.client.Gauge;
+import io.prometheus.client.Summary;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.pulsar.broker.transaction.buffer.TransactionBufferClientStats;
+import org.apache.pulsar.client.impl.transaction.TransactionBufferHandler;
+import org.apache.pulsar.common.naming.TopicName;
+
+public final class TransactionBufferClientStatsImpl implements TransactionBufferClientStats {
+    private static final double[] QUANTILES = {0.50, 0.75, 0.95, 0.99, 0.999, 0.9999, 1};
+    private final AtomicBoolean closed = new AtomicBoolean(false);
+
+    private final Counter abortFailed;
+    private final Counter commitFailed;
+    private final Summary abortLatency;
+    private final Summary commitLatency;
+    private final Gauge pendingRequests;
+
+    private final boolean exposeTopicLevelMetrics;
+
+    private static TransactionBufferClientStats instance;
+
+    private TransactionBufferClientStatsImpl(boolean exposeTopicLevelMetrics,
+                                             TransactionBufferHandler handler) {
+        this.exposeTopicLevelMetrics = exposeTopicLevelMetrics;
+        String[] labelNames = exposeTopicLevelMetrics
+                ? new String[]{"namespace", "topic"} : new String[]{"namespace"};
+
+        this.abortFailed = Counter.build("pulsar_txn_tb_client_abort_failed", "-")
+                .labelNames(labelNames)
+                .register();
+        this.commitFailed = Counter.build("pulsar_txn_tb_client_commit_failed", "-")
+                .labelNames(labelNames)
+                .register();
+        this.abortLatency =
+                this.buildSummary("pulsar_txn_tb_client_abort_latency", "-", labelNames);
+        this.commitLatency =
+                this.buildSummary("pulsar_txn_tb_client_commit_latency", "-", labelNames);
+
+        this.pendingRequests = Gauge.build("pulsar_txn_tb_client_pending_requests", "-")
+                .register()
+                .setChild(new Gauge.Child() {
+                    @Override
+                    public double get() {
+                        return null == handler ? 0 : handler.getPendingRequestsCount();
+                    }
+                });
+    }
+
+    private Summary buildSummary(String name, String help, String[] labelNames) {
+        Summary.Builder builder = Summary.build(name, help)
+                .labelNames(labelNames);
+        for (double quantile : QUANTILES) {
+            builder.quantile(quantile, 0.01D);
+        }
+        return builder.register();
+    }
+
+    public static synchronized TransactionBufferClientStats getInstance(boolean exposeTopicLevelMetrics,
+                                                                        TransactionBufferHandler handler) {
+        if (null == instance) {
+            instance = new TransactionBufferClientStatsImpl(exposeTopicLevelMetrics, handler);
+        }
+
+        return instance;
+    }
+
+    @Override
+    public void recordAbortFailed(String topic) {
+        this.abortFailed.labels(labelValues(topic)).inc();
+    }
+
+    @Override
+    public void recordCommitFailed(String topic) {
+        this.commitFailed.labels(labelValues(topic)).inc();
+    }
+
+    @Override
+    public void recordAbortLatency(String topic, long cost) {
+        this.abortLatency.labels(labelValues(topic)).observe(cost);
+    }
+
+    @Override
+    public void recordCommitLatency(String topic, long cost) {
+        this.commitLatency.labels(labelValues(topic)).observe(cost);
+    }
+
+    private String[] labelValues(String topic) {
+        try {
+            TopicName topicName = TopicName.get(topic);
+            return exposeTopicLevelMetrics
+                    ? new String[]{topicName.getNamespace(), topic} : new String[]{topicName.getNamespace()};
+        } catch (Throwable t) {
+            return exposeTopicLevelMetrics ? new String[]{"unknown", "unknown"} : new String[]{"unknown"};
+        }
+    }
+
+    @Override
+    public void close() {
+        if (instance == this && this.closed.compareAndSet(false, true)) {

Review Comment:
   why `instance == this` is needed ? if I call close() on this instance I expect to call this object even if it's not the singleton



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org