You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/12/21 02:18:37 UTC

[pulsar] 05/16: [Transaction] Add a check for uninitialized PendingAck (#13088)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit c5da572ea40bdd20f1e5ada6a3ad8a6a60183e89
Author: Xiangying Meng <55...@users.noreply.github.com>
AuthorDate: Mon Dec 6 22:00:38 2021 +0800

    [Transaction] Add a check for uninitialized PendingAck (#13088)
    
    ### Motivation
    
    We shoud not generate the statistics of a uninitialized PendingAck,and we should check if it is initialized when we get it by `getStoreManageLedger()`.
    ### Modifications
     Shoud not generate the statistics of a uninitialized PendingAck
     Add check if it is initialized when we get it by `getStoreManageLedger()`.
    
    (cherry picked from commit 591b4e80a7652ed608c04b769052744179473f0a)
---
 .../service/persistent/PersistentSubscription.java |  4 ++
 .../stats/prometheus/TransactionAggregator.java    |  4 +-
 .../transaction/pendingack/PendingAckHandle.java   |  5 ++
 .../pendingack/impl/PendingAckHandleDisabled.java  |  5 ++
 .../pendingack/impl/PendingAckHandleImpl.java      |  7 ++-
 .../broker/stats/TransactionMetricsTest.java       | 68 ++++++++++++++++++++++
 6 files changed, 91 insertions(+), 2 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index acfd9ee..8d75ea7 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -1219,5 +1219,9 @@ public class PersistentSubscription implements Subscription {
         }
     }
 
+    public boolean checkIfPendingAckStoreInit() {
+        return this.pendingAckHandle.checkIfPendingAckStoreInit();
+    }
+
     private static final Logger log = LoggerFactory.getLogger(PersistentSubscription.class);
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TransactionAggregator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TransactionAggregator.java
index 142ec48..65399d4 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TransactionAggregator.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TransactionAggregator.java
@@ -64,7 +64,9 @@ public class TransactionAggregator {
                             topic.getSubscriptions().values().forEach(subscription -> {
                                 try {
                                     localManageLedgerStats.get().reset();
-                                    if (!checkTopicIsEventsNames(TopicName.get(subscription.getTopic().getName()))) {
+                                    if (!checkTopicIsEventsNames(TopicName.get(subscription.getTopic().getName()))
+                                            && subscription instanceof  PersistentSubscription
+                                            && ((PersistentSubscription) subscription).checkIfPendingAckStoreInit()) {
                                         ManagedLedger managedLedger =
                                                 ((PersistentSubscription) subscription)
                                                         .getPendingAckManageLedger().get();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckHandle.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckHandle.java
index 3664c5d..dc64cbe 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckHandle.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckHandle.java
@@ -159,4 +159,9 @@ public interface PendingAckHandle {
      */
     CompletableFuture<Void> close();
 
+    /**
+     * Check if the PendingAckStore is init.
+     * @return if the PendingAckStore is init.
+     */
+    boolean checkIfPendingAckStoreInit();
 }
\ No newline at end of file
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleDisabled.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleDisabled.java
index cf6b5c8..634655e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleDisabled.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleDisabled.java
@@ -99,4 +99,9 @@ public class PendingAckHandleDisabled implements PendingAckHandle {
     public CompletableFuture<Void> close() {
         return CompletableFuture.completedFuture(null);
     }
+
+    @Override
+    public boolean checkIfPendingAckStoreInit() {
+        return false;
+    }
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
index 78bab96..d92793a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
@@ -923,7 +923,7 @@ public class PendingAckHandleImpl extends PendingAckHandleState implements Pendi
     }
 
     public CompletableFuture<ManagedLedger> getStoreManageLedger() {
-        if (this.pendingAckStoreFuture.isDone()) {
+        if (this.pendingAckStoreFuture != null && this.pendingAckStoreFuture.isDone()) {
             return this.pendingAckStoreFuture.thenCompose(pendingAckStore -> {
                 if (pendingAckStore instanceof MLPendingAckStore) {
                     return ((MLPendingAckStore) pendingAckStore).getManagedLedger();
@@ -937,6 +937,11 @@ public class PendingAckHandleImpl extends PendingAckHandleState implements Pendi
         }
     }
 
+    @Override
+    public boolean checkIfPendingAckStoreInit() {
+        return this.pendingAckStoreFuture != null && this.pendingAckStoreFuture.isDone();
+    }
+
     protected void handleCacheRequest() {
         while (true) {
             Runnable runnable = acceptQueue.poll();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/TransactionMetricsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/TransactionMetricsTest.java
index cb8e430..6a4b5c4 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/TransactionMetricsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/TransactionMetricsTest.java
@@ -40,6 +40,7 @@ import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
 import org.apache.pulsar.transaction.coordinator.TransactionSubscription;
 import org.apache.pulsar.transaction.coordinator.impl.MLTransactionLogImpl;
 import org.awaitility.Awaitility;
+import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
@@ -271,6 +272,73 @@ public class TransactionMetricsTest extends BrokerTestBase {
         assertEquals(metric.size(), 2);
     }
 
+    @Test
+    public void testManagedLedgerMetricsWhenPendingAckNotInit() throws Exception{
+        String ns1 = "prop/ns-abc1";
+        admin.namespaces().createNamespace(ns1);
+        String topic = "persistent://" + ns1 + "/testManagedLedgerMetricsWhenPendingAckNotInit";
+        String subName = "test_managed_ledger_metrics";
+        String subName2 = "test_pending_ack_no_init";
+        admin.topics().createNonPartitionedTopic(topic);
+        admin.lookups().lookupTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString());
+        TransactionCoordinatorID transactionCoordinatorIDOne = TransactionCoordinatorID.get(0);
+        pulsar.getTransactionMetadataStoreService().handleTcClientConnect(transactionCoordinatorIDOne).get();
+        admin.topics().createSubscription(topic, subName, MessageId.earliest);
+        admin.topics().createSubscription(topic, subName2, MessageId.earliest);
+
+        Awaitility.await().atMost(2000,  TimeUnit.MILLISECONDS).until(() ->
+                pulsar.getTransactionMetadataStoreService().getStores().size() == 1);
+
+        pulsarClient = PulsarClient.builder().serviceUrl(lookupUrl.toString()).enableTransaction(true).build();
+
+        Consumer<byte[]> consumer = pulsarClient.newConsumer()
+                .topic(topic)
+                .receiverQueueSize(10)
+                .subscriptionName(subName)
+                .subscriptionType(SubscriptionType.Key_Shared)
+                .subscribe();
+
+        Producer<byte[]> producer = pulsarClient.newProducer()
+                .topic(topic)
+                .create();
+
+        Transaction transaction =
+                pulsarClient.newTransaction().withTransactionTimeout(10, TimeUnit.SECONDS).build().get();
+        producer.send("hello pulsar".getBytes());
+        consumer.acknowledgeAsync(consumer.receive().getMessageId(), transaction).get();
+        ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
+        PrometheusMetricsGenerator.generate(pulsar, true, false, false, statsOut);
+        String metricsStr = statsOut.toString();
+
+        Multimap<String, PrometheusMetricsTest.Metric> metrics = parseMetrics(metricsStr);
+
+        Collection<PrometheusMetricsTest.Metric> metric = metrics.get("pulsar_storage_size");
+        checkManagedLedgerMetrics(subName, 32, metric);
+        //No statistics of the pendingAck are generated when the pendingAck is not initialized.
+        for (PrometheusMetricsTest.Metric metric1 : metric) {
+            if (metric1.tags.containsValue(subName2)) {
+                Assert.fail();
+            }
+        }
+
+        consumer = pulsarClient.newConsumer()
+                .topic(topic)
+                .receiverQueueSize(10)
+                .subscriptionName(subName2)
+                .subscriptionType(SubscriptionType.Key_Shared)
+                .subscribe();
+        transaction =
+                pulsarClient.newTransaction().withTransactionTimeout(10, TimeUnit.SECONDS).build().get();
+        consumer.acknowledgeAsync(consumer.receive().getMessageId(), transaction).get();
+
+        statsOut = new ByteArrayOutputStream();
+        PrometheusMetricsGenerator.generate(pulsar, true, false, false, statsOut);
+        metricsStr = statsOut.toString();
+        metrics = parseMetrics(metricsStr);
+        metric = metrics.get("pulsar_storage_size");
+        checkManagedLedgerMetrics(subName2, 32, metric);
+    }
+
     private void checkManagedLedgerMetrics(String tag, double value, Collection<PrometheusMetricsTest.Metric> metrics) {
         boolean exist = false;
         for (PrometheusMetricsTest.Metric metric1 : metrics) {