You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by xy...@apache.org on 2022/05/25 12:55:02 UTC
[pulsar] branch master updated: [fix][broker] Fix creating producer failure when set backlog quota. (#15663)
This is an automated email from the ASF dual-hosted git repository.
xyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 3a8045851f7 [fix][broker] Fix creating producer failure when set backlog quota. (#15663)
3a8045851f7 is described below
commit 3a8045851f7e9ea62da104dab2b7fe2b47a95ca9
Author: Jiwei Guo <te...@apache.org>
AuthorDate: Wed May 25 05:54:55 2022 -0700
[fix][broker] Fix creating producer failure when set backlog quota. (#15663)
### Motivation
When trying to reproduce the problem of #15609 using the master's code, it was found that the master also had this bug. The root cause is:
When there is only one ledger in the ManagedLedger, after the current ledger is closed, it has the timestamp and exceeds the time set by the backlog-qutoa, resulting in the failure to create the producer.
The added test could reproduce this.
So when there is only one ledger, we should not exclude it.
### Verifying this change
If revert this patch, the added test will fail.
---
.../broker/service/persistent/PersistentTopic.java | 47 +++++++++++-----
.../systopic/PartitionedSystemTopicTest.java | 64 ++++++++++++++++++++++
2 files changed, 96 insertions(+), 15 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index dec5d80cb5f..30de90ac1a7 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -2551,22 +2551,9 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
}, null);
return future;
} else {
- Long ledgerId = ((ManagedCursorContainer) ledger.getCursors()).getSlowestReaderPosition().getLedgerId();
+ PositionImpl slowestPosition = ((ManagedCursorContainer) ledger.getCursors()).getSlowestReaderPosition();
try {
- org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo
- ledgerInfo = ledger.getLedgerInfo(ledgerId).get();
- if (ledgerInfo != null && ledgerInfo.hasTimestamp() && ledgerInfo.getTimestamp() > 0
- && ((ManagedLedgerImpl) ledger).getClock().millis() - ledgerInfo.getTimestamp()
- > backlogQuotaLimitInSecond * 1000) {
- if (log.isDebugEnabled()) {
- log.debug("Time based backlog quota exceeded, quota {}, age of ledger "
- + "slowest cursor currently on {}", backlogQuotaLimitInSecond * 1000,
- ((ManagedLedgerImpl) ledger).getClock().millis() - ledgerInfo.getTimestamp());
- }
- return CompletableFuture.completedFuture(true);
- } else {
- return CompletableFuture.completedFuture(false);
- }
+ return slowestReaderTimeBasedBacklogQuotaCheck(slowestPosition);
} catch (Exception e) {
log.error("[{}][{}] Error reading entry for precise time based backlog check", topicName, e);
return CompletableFuture.completedFuture(false);
@@ -2574,6 +2561,36 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
}
}
+ private CompletableFuture<Boolean> slowestReaderTimeBasedBacklogQuotaCheck(PositionImpl slowestPosition)
+ throws ExecutionException, InterruptedException {
+ int backlogQuotaLimitInSecond = getBacklogQuota(BacklogQuotaType.message_age).getLimitTime();
+ Long ledgerId = slowestPosition.getLedgerId();
+ if (((ManagedLedgerImpl) ledger).getLedgersInfo().lastKey().equals(ledgerId)) {
+ return CompletableFuture.completedFuture(false);
+ }
+ int result;
+ org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo
+ ledgerInfo = ledger.getLedgerInfo(ledgerId).get();
+ if (ledgerInfo != null && ledgerInfo.hasTimestamp() && ledgerInfo.getTimestamp() > 0
+ && ((ManagedLedgerImpl) ledger).getClock().millis() - ledgerInfo.getTimestamp()
+ > backlogQuotaLimitInSecond * 1000 && (result = slowestPosition.compareTo(
+ new PositionImpl(ledgerInfo.getLedgerId(), ledgerInfo.getEntries() - 1))) <= 0) {
+ if (result < 0) {
+ if (log.isDebugEnabled()) {
+ log.debug("Time based backlog quota exceeded, quota {}, age of ledger "
+ + "slowest cursor currently on {}", backlogQuotaLimitInSecond * 1000,
+ ((ManagedLedgerImpl) ledger).getClock().millis() - ledgerInfo.getTimestamp());
+ }
+ return CompletableFuture.completedFuture(true);
+ } else {
+ return slowestReaderTimeBasedBacklogQuotaCheck(
+ ((ManagedLedgerImpl) ledger).getNextValidPosition(slowestPosition));
+ }
+ } else {
+ return CompletableFuture.completedFuture(false);
+ }
+ }
+
@Override
public boolean isReplicated() {
return !replicators.isEmpty();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
index ecb5c085de3..d4ed12573f3 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
@@ -27,15 +27,19 @@ import org.apache.commons.lang.RandomStringUtils;
import org.apache.pulsar.broker.admin.impl.BrokersBase;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.service.BrokerTestBase;
+import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.admin.ListTopicsOptions;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.SystemTopicNames;
+import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.common.naming.TopicName;
@@ -44,6 +48,7 @@ import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.util.FutureUtil;
import org.awaitility.Awaitility;
import org.mockito.Mockito;
+import org.powermock.reflect.Whitebox;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
@@ -51,6 +56,7 @@ import org.testng.annotations.Test;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
+import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
@@ -66,6 +72,8 @@ public class PartitionedSystemTopicTest extends BrokerTestBase {
conf.setAllowAutoTopicCreation(false);
conf.setAllowAutoTopicCreationType("partitioned");
conf.setDefaultNumPartitions(PARTITIONS);
+ conf.setManagedLedgerMaxEntriesPerLedger(1);
+ conf.setBrokerDeleteInactiveTopicsEnabled(false);
super.baseSetup();
}
@@ -171,4 +179,60 @@ public class PartitionedSystemTopicTest extends BrokerTestBase {
});
}
+ @Test
+ private void testSetBacklogCausedCreatingProducerFailure() throws Exception {
+ final String ns = "prop/ns-test";
+ final String topic = ns + "/topic-1";
+
+ admin.namespaces().createNamespace(ns, 2);
+ admin.topics().createPartitionedTopic(String.format("persistent://%s", topic), 1);
+ BacklogQuota quota = BacklogQuota.builder()
+ .limitTime(2)
+ .limitSize(-1)
+ .retentionPolicy(BacklogQuota.RetentionPolicy.producer_exception)
+ .build();
+ admin.namespaces().setBacklogQuota(ns, quota, BacklogQuota.BacklogQuotaType.message_age);
+
+ @Cleanup
+ Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+ .topic(topic)
+ .create();
+
+ String partition0 = TopicName.get(String.format("persistent://%s", topic)).getPartition(0).toString();
+ Optional<Topic> topicReference = pulsar.getBrokerService().getTopicReference(partition0);
+ Assert.assertTrue(topicReference.isPresent());
+ PersistentTopic persistentTopic = (PersistentTopic) topicReference.get();
+ ManagedLedgerConfig config = persistentTopic.getManagedLedger().getConfig();
+ config.setMinimumRolloverTime(1, TimeUnit.SECONDS);
+ config.setMaximumRolloverTime(1, TimeUnit.SECONDS);
+ persistentTopic.getManagedLedger().setConfig(config);
+ Whitebox.invokeMethod(persistentTopic.getManagedLedger(), "updateLastLedgerCreatedTimeAndScheduleRolloverTask");
+ String msg1 = "msg-1";
+ producer.send(msg1);
+ Thread.sleep(3 * 1000);
+
+ Consumer<String> consumer2 = pulsarClient.newConsumer(Schema.STRING)
+ .topic(topic)
+ .subscriptionName("sub-1")
+ .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+ .subscriptionType(SubscriptionType.Key_Shared)
+ .subscribe();
+
+ Message<String> receive = consumer2.receive();
+ consumer2.acknowledge(receive);
+
+ Thread.sleep(3 * 1000);
+
+ try {
+ Producer<String> producerN = PulsarClient.builder()
+ .maxBackoffInterval(3, TimeUnit.SECONDS)
+ .operationTimeout(5, TimeUnit.SECONDS)
+ .serviceUrl(lookupUrl.toString()).connectionTimeout(2, TimeUnit.SECONDS).build()
+ .newProducer(Schema.STRING).topic(topic).sendTimeout(3, TimeUnit.SECONDS).create();
+ Assert.assertTrue(producerN.isConnected());
+ producerN.close();
+ } catch (Exception ex) {
+ Assert.fail("failed to create producer");
+ }
+ }
}