You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/05/26 02:11:52 UTC

[pulsar] branch branch-2.9 updated: [fix][broker] Fix creating producer failure when set backlog quota. (#15663)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.9 by this push:
     new 90c4653aea8 [fix][broker] Fix creating producer failure when set backlog quota. (#15663)
90c4653aea8 is described below

commit 90c4653aea848953696599c72553952b3ef0b53f
Author: Jiwei Guo <te...@apache.org>
AuthorDate: Wed May 25 05:54:55 2022 -0700

    [fix][broker] Fix creating producer failure when set backlog quota. (#15663)
    
    When trying to reproduce the problem of #15609 using the master's code, it was found that the master also had this bug. The root cause is:
    When there is only one ledger in the ManagedLedger, after the current ledger is closed, it has the timestamp and exceeds the time set by the backlog-qutoa, resulting in the failure to create the producer.
    
    The added test could reproduce this.
    
    So when there is only one ledger, we should not exclude it.
    
    If revert this patch, the added test will fail.
    
    (cherry picked from commit 3a8045851f7e9ea62da104dab2b7fe2b47a95ca9)
---
 .../broker/service/persistent/PersistentTopic.java | 47 +++++++++++-----
 .../systopic/PartitionedSystemTopicTest.java       | 64 ++++++++++++++++++++++
 2 files changed, 96 insertions(+), 15 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 6bcfa36320c..db824b0706b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -2626,22 +2626,9 @@ public class PersistentTopic extends AbstractTopic
                 return false;
             }
         } else {
-            Long ledgerId = ((ManagedCursorContainer) ledger.getCursors()).getSlowestReaderPosition().getLedgerId();
+            PositionImpl slowestPosition = ((ManagedCursorContainer) ledger.getCursors()).getSlowestReaderPosition();
             try {
-                org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo
-                        ledgerInfo = ledger.getLedgerInfo(ledgerId).get();
-                if (ledgerInfo != null && ledgerInfo.hasTimestamp() && ledgerInfo.getTimestamp() > 0
-                        && ((ManagedLedgerImpl) ledger).getClock().millis() - ledgerInfo.getTimestamp()
-                        > backlogQuotaLimitInSecond * 1000) {
-                    if (log.isDebugEnabled()) {
-                        log.debug("Time based backlog quota exceeded, quota {}, age of ledger "
-                                        + "slowest cursor currently on {}", backlogQuotaLimitInSecond * 1000,
-                                ((ManagedLedgerImpl) ledger).getClock().millis() - ledgerInfo.getTimestamp());
-                    }
-                    return true;
-                } else {
-                    return false;
-                }
+                return slowestReaderTimeBasedBacklogQuotaCheck(slowestPosition);
             } catch (Exception e) {
                 log.error("[{}][{}] Error reading entry for precise time based backlog check", topicName, e);
                 return false;
@@ -2649,6 +2636,36 @@ public class PersistentTopic extends AbstractTopic
         }
     }
 
+    private boolean slowestReaderTimeBasedBacklogQuotaCheck(PositionImpl slowestPosition)
+            throws ExecutionException, InterruptedException {
+        int backlogQuotaLimitInSecond = getBacklogQuota(BacklogQuota.BacklogQuotaType.message_age).getLimitTime();
+        Long ledgerId = slowestPosition.getLedgerId();
+        if (((ManagedLedgerImpl) ledger).getLedgersInfo().lastKey().equals(ledgerId)) {
+            return false;
+        }
+        int result;
+        org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo
+                ledgerInfo = ledger.getLedgerInfo(ledgerId).get();
+        if (ledgerInfo != null && ledgerInfo.hasTimestamp() && ledgerInfo.getTimestamp() > 0
+                && ((ManagedLedgerImpl) ledger).getClock().millis() - ledgerInfo.getTimestamp()
+                > backlogQuotaLimitInSecond * 1000 && (result = slowestPosition.compareTo(
+                new PositionImpl(ledgerInfo.getLedgerId(), ledgerInfo.getEntries() - 1))) <= 0) {
+            if (result < 0) {
+                if (log.isDebugEnabled()) {
+                    log.debug("Time based backlog quota exceeded, quota {}, age of ledger "
+                                    + "slowest cursor currently on {}", backlogQuotaLimitInSecond * 1000,
+                            ((ManagedLedgerImpl) ledger).getClock().millis() - ledgerInfo.getTimestamp());
+                }
+                return true;
+            } else {
+                return slowestReaderTimeBasedBacklogQuotaCheck(
+                        ((ManagedLedgerImpl) ledger).getNextValidPosition(slowestPosition));
+            }
+        } else {
+            return false;
+        }
+    }
+
     @Override
     public boolean isReplicated() {
         return !replicators.isEmpty();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
index d506f712767..cf45d614f0f 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
@@ -27,14 +27,18 @@ import org.apache.commons.lang.RandomStringUtils;
 import org.apache.pulsar.broker.admin.impl.BrokersBase;
 import org.apache.pulsar.broker.namespace.NamespaceService;
 import org.apache.pulsar.broker.service.BrokerTestBase;
+import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionInitialPosition;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.common.events.EventsTopicNames;
 import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.policies.data.BacklogQuota;
 import org.apache.pulsar.common.policies.data.TenantInfo;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.common.naming.TopicName;
@@ -43,6 +47,7 @@ import org.apache.pulsar.common.policies.data.TenantInfoImpl;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.awaitility.Awaitility;
 import org.mockito.Mockito;
+import org.powermock.reflect.Whitebox;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
@@ -50,6 +55,7 @@ import org.testng.annotations.Test;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 
@@ -65,6 +71,8 @@ public class PartitionedSystemTopicTest extends BrokerTestBase {
         conf.setAllowAutoTopicCreation(false);
         conf.setAllowAutoTopicCreationType("partitioned");
         conf.setDefaultNumPartitions(PARTITIONS);
+        conf.setManagedLedgerMaxEntriesPerLedger(1);
+        conf.setBrokerDeleteInactiveTopicsEnabled(false);
 
         conf.setSystemTopicEnabled(true);
         conf.setTopicLevelPoliciesEnabled(true);
@@ -170,4 +178,60 @@ public class PartitionedSystemTopicTest extends BrokerTestBase {
         });
     }
 
+    @Test
+    private void testSetBacklogCausedCreatingProducerFailure() throws Exception {
+        final String ns = "prop/ns-test";
+        final String topic = ns + "/topic-1";
+
+        admin.namespaces().createNamespace(ns, 2);
+        admin.topics().createPartitionedTopic(String.format("persistent://%s", topic), 1);
+        BacklogQuota quota = BacklogQuota.builder()
+                .limitTime(2)
+                .limitSize(-1)
+                .retentionPolicy(BacklogQuota.RetentionPolicy.producer_exception)
+                .build();
+        admin.namespaces().setBacklogQuota(ns, quota, BacklogQuota.BacklogQuotaType.message_age);
+
+        @Cleanup
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+                .topic(topic)
+                .create();
+
+        String partition0 = TopicName.get(String.format("persistent://%s", topic)).getPartition(0).toString();
+        Optional<Topic> topicReference = pulsar.getBrokerService().getTopicReference(partition0);
+        Assert.assertTrue(topicReference.isPresent());
+        PersistentTopic persistentTopic = (PersistentTopic) topicReference.get();
+        ManagedLedgerConfig config = persistentTopic.getManagedLedger().getConfig();
+        config.setMinimumRolloverTime(1, TimeUnit.SECONDS);
+        config.setMaximumRolloverTime(1, TimeUnit.SECONDS);
+        persistentTopic.getManagedLedger().setConfig(config);
+        Whitebox.invokeMethod(persistentTopic.getManagedLedger(), "updateLastLedgerCreatedTimeAndScheduleRolloverTask");
+        String msg1 = "msg-1";
+        producer.send(msg1);
+        Thread.sleep(3 * 1000);
+
+        Consumer<String> consumer2 = pulsarClient.newConsumer(Schema.STRING)
+                .topic(topic)
+                .subscriptionName("sub-1")
+                .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                .subscriptionType(SubscriptionType.Key_Shared)
+                .subscribe();
+
+        Message<String> receive = consumer2.receive();
+        consumer2.acknowledge(receive);
+
+        Thread.sleep(3 * 1000);
+
+        try {
+            Producer<String> producerN = PulsarClient.builder()
+                    .maxBackoffInterval(3, TimeUnit.SECONDS)
+                    .operationTimeout(5, TimeUnit.SECONDS)
+                    .serviceUrl(lookupUrl.toString()).connectionTimeout(2, TimeUnit.SECONDS).build()
+                    .newProducer(Schema.STRING).topic(topic).sendTimeout(3, TimeUnit.SECONDS).create();
+            Assert.assertTrue(producerN.isConnected());
+            producerN.close();
+        } catch (Exception ex) {
+            Assert.fail("failed to create producer");
+        }
+    }
 }