You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/12/21 02:18:37 UTC
[pulsar] 05/16: [Transaction] Add a check for uninitialized PendingAck (#13088)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit c5da572ea40bdd20f1e5ada6a3ad8a6a60183e89
Author: Xiangying Meng <55...@users.noreply.github.com>
AuthorDate: Mon Dec 6 22:00:38 2021 +0800
[Transaction] Add a check for uninitialized PendingAck (#13088)
### Motivation
We shoud not generate the statistics of a uninitialized PendingAck,and we should check if it is initialized when we get it by `getStoreManageLedger()`.
### Modifications
Shoud not generate the statistics of a uninitialized PendingAck
Add check if it is initialized when we get it by `getStoreManageLedger()`.
(cherry picked from commit 591b4e80a7652ed608c04b769052744179473f0a)
---
.../service/persistent/PersistentSubscription.java | 4 ++
.../stats/prometheus/TransactionAggregator.java | 4 +-
.../transaction/pendingack/PendingAckHandle.java | 5 ++
.../pendingack/impl/PendingAckHandleDisabled.java | 5 ++
.../pendingack/impl/PendingAckHandleImpl.java | 7 ++-
.../broker/stats/TransactionMetricsTest.java | 68 ++++++++++++++++++++++
6 files changed, 91 insertions(+), 2 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index acfd9ee..8d75ea7 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -1219,5 +1219,9 @@ public class PersistentSubscription implements Subscription {
}
}
+ public boolean checkIfPendingAckStoreInit() {
+ return this.pendingAckHandle.checkIfPendingAckStoreInit();
+ }
+
private static final Logger log = LoggerFactory.getLogger(PersistentSubscription.class);
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TransactionAggregator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TransactionAggregator.java
index 142ec48..65399d4 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TransactionAggregator.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TransactionAggregator.java
@@ -64,7 +64,9 @@ public class TransactionAggregator {
topic.getSubscriptions().values().forEach(subscription -> {
try {
localManageLedgerStats.get().reset();
- if (!checkTopicIsEventsNames(TopicName.get(subscription.getTopic().getName()))) {
+ if (!checkTopicIsEventsNames(TopicName.get(subscription.getTopic().getName()))
+ && subscription instanceof PersistentSubscription
+ && ((PersistentSubscription) subscription).checkIfPendingAckStoreInit()) {
ManagedLedger managedLedger =
((PersistentSubscription) subscription)
.getPendingAckManageLedger().get();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckHandle.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckHandle.java
index 3664c5d..dc64cbe 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckHandle.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckHandle.java
@@ -159,4 +159,9 @@ public interface PendingAckHandle {
*/
CompletableFuture<Void> close();
+ /**
+ * Check if the PendingAckStore is init.
+ * @return if the PendingAckStore is init.
+ */
+ boolean checkIfPendingAckStoreInit();
}
\ No newline at end of file
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleDisabled.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleDisabled.java
index cf6b5c8..634655e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleDisabled.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleDisabled.java
@@ -99,4 +99,9 @@ public class PendingAckHandleDisabled implements PendingAckHandle {
public CompletableFuture<Void> close() {
return CompletableFuture.completedFuture(null);
}
+
+ @Override
+ public boolean checkIfPendingAckStoreInit() {
+ return false;
+ }
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
index 78bab96..d92793a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
@@ -923,7 +923,7 @@ public class PendingAckHandleImpl extends PendingAckHandleState implements Pendi
}
public CompletableFuture<ManagedLedger> getStoreManageLedger() {
- if (this.pendingAckStoreFuture.isDone()) {
+ if (this.pendingAckStoreFuture != null && this.pendingAckStoreFuture.isDone()) {
return this.pendingAckStoreFuture.thenCompose(pendingAckStore -> {
if (pendingAckStore instanceof MLPendingAckStore) {
return ((MLPendingAckStore) pendingAckStore).getManagedLedger();
@@ -937,6 +937,11 @@ public class PendingAckHandleImpl extends PendingAckHandleState implements Pendi
}
}
+ @Override
+ public boolean checkIfPendingAckStoreInit() {
+ return this.pendingAckStoreFuture != null && this.pendingAckStoreFuture.isDone();
+ }
+
protected void handleCacheRequest() {
while (true) {
Runnable runnable = acceptQueue.poll();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/TransactionMetricsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/TransactionMetricsTest.java
index cb8e430..6a4b5c4 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/TransactionMetricsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/TransactionMetricsTest.java
@@ -40,6 +40,7 @@ import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
import org.apache.pulsar.transaction.coordinator.TransactionSubscription;
import org.apache.pulsar.transaction.coordinator.impl.MLTransactionLogImpl;
import org.awaitility.Awaitility;
+import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
@@ -271,6 +272,73 @@ public class TransactionMetricsTest extends BrokerTestBase {
assertEquals(metric.size(), 2);
}
+ @Test
+ public void testManagedLedgerMetricsWhenPendingAckNotInit() throws Exception{
+ String ns1 = "prop/ns-abc1";
+ admin.namespaces().createNamespace(ns1);
+ String topic = "persistent://" + ns1 + "/testManagedLedgerMetricsWhenPendingAckNotInit";
+ String subName = "test_managed_ledger_metrics";
+ String subName2 = "test_pending_ack_no_init";
+ admin.topics().createNonPartitionedTopic(topic);
+ admin.lookups().lookupTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString());
+ TransactionCoordinatorID transactionCoordinatorIDOne = TransactionCoordinatorID.get(0);
+ pulsar.getTransactionMetadataStoreService().handleTcClientConnect(transactionCoordinatorIDOne).get();
+ admin.topics().createSubscription(topic, subName, MessageId.earliest);
+ admin.topics().createSubscription(topic, subName2, MessageId.earliest);
+
+ Awaitility.await().atMost(2000, TimeUnit.MILLISECONDS).until(() ->
+ pulsar.getTransactionMetadataStoreService().getStores().size() == 1);
+
+ pulsarClient = PulsarClient.builder().serviceUrl(lookupUrl.toString()).enableTransaction(true).build();
+
+ Consumer<byte[]> consumer = pulsarClient.newConsumer()
+ .topic(topic)
+ .receiverQueueSize(10)
+ .subscriptionName(subName)
+ .subscriptionType(SubscriptionType.Key_Shared)
+ .subscribe();
+
+ Producer<byte[]> producer = pulsarClient.newProducer()
+ .topic(topic)
+ .create();
+
+ Transaction transaction =
+ pulsarClient.newTransaction().withTransactionTimeout(10, TimeUnit.SECONDS).build().get();
+ producer.send("hello pulsar".getBytes());
+ consumer.acknowledgeAsync(consumer.receive().getMessageId(), transaction).get();
+ ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
+ PrometheusMetricsGenerator.generate(pulsar, true, false, false, statsOut);
+ String metricsStr = statsOut.toString();
+
+ Multimap<String, PrometheusMetricsTest.Metric> metrics = parseMetrics(metricsStr);
+
+ Collection<PrometheusMetricsTest.Metric> metric = metrics.get("pulsar_storage_size");
+ checkManagedLedgerMetrics(subName, 32, metric);
+ //No statistics of the pendingAck are generated when the pendingAck is not initialized.
+ for (PrometheusMetricsTest.Metric metric1 : metric) {
+ if (metric1.tags.containsValue(subName2)) {
+ Assert.fail();
+ }
+ }
+
+ consumer = pulsarClient.newConsumer()
+ .topic(topic)
+ .receiverQueueSize(10)
+ .subscriptionName(subName2)
+ .subscriptionType(SubscriptionType.Key_Shared)
+ .subscribe();
+ transaction =
+ pulsarClient.newTransaction().withTransactionTimeout(10, TimeUnit.SECONDS).build().get();
+ consumer.acknowledgeAsync(consumer.receive().getMessageId(), transaction).get();
+
+ statsOut = new ByteArrayOutputStream();
+ PrometheusMetricsGenerator.generate(pulsar, true, false, false, statsOut);
+ metricsStr = statsOut.toString();
+ metrics = parseMetrics(metricsStr);
+ metric = metrics.get("pulsar_storage_size");
+ checkManagedLedgerMetrics(subName2, 32, metric);
+ }
+
private void checkManagedLedgerMetrics(String tag, double value, Collection<PrometheusMetricsTest.Metric> metrics) {
boolean exist = false;
for (PrometheusMetricsTest.Metric metric1 : metrics) {