You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by xy...@apache.org on 2022/05/25 12:55:02 UTC

[pulsar] branch master updated: [fix][broker] Fix creating producer failure when set backlog quota. (#15663)

This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 3a8045851f7 [fix][broker] Fix creating producer failure when set backlog quota. (#15663)
3a8045851f7 is described below

commit 3a8045851f7e9ea62da104dab2b7fe2b47a95ca9
Author: Jiwei Guo <te...@apache.org>
AuthorDate: Wed May 25 05:54:55 2022 -0700

    [fix][broker] Fix creating producer failure when set backlog quota. (#15663)
    
    ### Motivation
    
    When trying to reproduce the problem of #15609 using the master's code, it was found that the master also had this bug. The root cause is:
    When there is only one ledger in the ManagedLedger, after the current ledger is closed, it has the timestamp and exceeds the time set by the backlog-qutoa, resulting in the failure to create the producer.
    
    The added test could reproduce this.
    
    So when there is only one ledger, we should not exclude it.
    
    ### Verifying this change
    
    If revert this patch, the added test will fail.
---
 .../broker/service/persistent/PersistentTopic.java | 47 +++++++++++-----
 .../systopic/PartitionedSystemTopicTest.java       | 64 ++++++++++++++++++++++
 2 files changed, 96 insertions(+), 15 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index dec5d80cb5f..30de90ac1a7 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -2551,22 +2551,9 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
                     }, null);
             return future;
         } else {
-            Long ledgerId = ((ManagedCursorContainer) ledger.getCursors()).getSlowestReaderPosition().getLedgerId();
+            PositionImpl slowestPosition = ((ManagedCursorContainer) ledger.getCursors()).getSlowestReaderPosition();
             try {
-                org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo
-                        ledgerInfo = ledger.getLedgerInfo(ledgerId).get();
-                if (ledgerInfo != null && ledgerInfo.hasTimestamp() && ledgerInfo.getTimestamp() > 0
-                        && ((ManagedLedgerImpl) ledger).getClock().millis() - ledgerInfo.getTimestamp()
-                        > backlogQuotaLimitInSecond * 1000) {
-                    if (log.isDebugEnabled()) {
-                        log.debug("Time based backlog quota exceeded, quota {}, age of ledger "
-                                        + "slowest cursor currently on {}", backlogQuotaLimitInSecond * 1000,
-                                ((ManagedLedgerImpl) ledger).getClock().millis() - ledgerInfo.getTimestamp());
-                    }
-                    return CompletableFuture.completedFuture(true);
-                } else {
-                    return CompletableFuture.completedFuture(false);
-                }
+                return slowestReaderTimeBasedBacklogQuotaCheck(slowestPosition);
             } catch (Exception e) {
                 log.error("[{}][{}] Error reading entry for precise time based backlog check", topicName, e);
                 return CompletableFuture.completedFuture(false);
@@ -2574,6 +2561,36 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
         }
     }
 
+    private CompletableFuture<Boolean> slowestReaderTimeBasedBacklogQuotaCheck(PositionImpl slowestPosition)
+            throws ExecutionException, InterruptedException {
+        int backlogQuotaLimitInSecond = getBacklogQuota(BacklogQuotaType.message_age).getLimitTime();
+        Long ledgerId = slowestPosition.getLedgerId();
+        if (((ManagedLedgerImpl) ledger).getLedgersInfo().lastKey().equals(ledgerId)) {
+            return CompletableFuture.completedFuture(false);
+        }
+        int result;
+        org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo
+                ledgerInfo = ledger.getLedgerInfo(ledgerId).get();
+        if (ledgerInfo != null && ledgerInfo.hasTimestamp() && ledgerInfo.getTimestamp() > 0
+                && ((ManagedLedgerImpl) ledger).getClock().millis() - ledgerInfo.getTimestamp()
+                > backlogQuotaLimitInSecond * 1000 && (result = slowestPosition.compareTo(
+                new PositionImpl(ledgerInfo.getLedgerId(), ledgerInfo.getEntries() - 1))) <= 0) {
+            if (result < 0) {
+                if (log.isDebugEnabled()) {
+                    log.debug("Time based backlog quota exceeded, quota {}, age of ledger "
+                                    + "slowest cursor currently on {}", backlogQuotaLimitInSecond * 1000,
+                            ((ManagedLedgerImpl) ledger).getClock().millis() - ledgerInfo.getTimestamp());
+                }
+                return CompletableFuture.completedFuture(true);
+            } else {
+                return slowestReaderTimeBasedBacklogQuotaCheck(
+                        ((ManagedLedgerImpl) ledger).getNextValidPosition(slowestPosition));
+            }
+        } else {
+            return CompletableFuture.completedFuture(false);
+        }
+    }
+
     @Override
     public boolean isReplicated() {
         return !replicators.isEmpty();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
index ecb5c085de3..d4ed12573f3 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
@@ -27,15 +27,19 @@ import org.apache.commons.lang.RandomStringUtils;
 import org.apache.pulsar.broker.admin.impl.BrokersBase;
 import org.apache.pulsar.broker.namespace.NamespaceService;
 import org.apache.pulsar.broker.service.BrokerTestBase;
+import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.client.admin.ListTopicsOptions;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionInitialPosition;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.SystemTopicNames;
+import org.apache.pulsar.common.policies.data.BacklogQuota;
 import org.apache.pulsar.common.policies.data.TenantInfo;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.common.naming.TopicName;
@@ -44,6 +48,7 @@ import org.apache.pulsar.common.policies.data.TenantInfoImpl;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.awaitility.Awaitility;
 import org.mockito.Mockito;
+import org.powermock.reflect.Whitebox;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
@@ -51,6 +56,7 @@ import org.testng.annotations.Test;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 
@@ -66,6 +72,8 @@ public class PartitionedSystemTopicTest extends BrokerTestBase {
         conf.setAllowAutoTopicCreation(false);
         conf.setAllowAutoTopicCreationType("partitioned");
         conf.setDefaultNumPartitions(PARTITIONS);
+        conf.setManagedLedgerMaxEntriesPerLedger(1);
+        conf.setBrokerDeleteInactiveTopicsEnabled(false);
 
         super.baseSetup();
     }
@@ -171,4 +179,60 @@ public class PartitionedSystemTopicTest extends BrokerTestBase {
         });
     }
 
+    @Test
+    private void testSetBacklogCausedCreatingProducerFailure() throws Exception {
+        final String ns = "prop/ns-test";
+        final String topic = ns + "/topic-1";
+
+        admin.namespaces().createNamespace(ns, 2);
+        admin.topics().createPartitionedTopic(String.format("persistent://%s", topic), 1);
+        BacklogQuota quota = BacklogQuota.builder()
+                .limitTime(2)
+                .limitSize(-1)
+                .retentionPolicy(BacklogQuota.RetentionPolicy.producer_exception)
+                .build();
+        admin.namespaces().setBacklogQuota(ns, quota, BacklogQuota.BacklogQuotaType.message_age);
+
+        @Cleanup
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+                .topic(topic)
+                .create();
+
+        String partition0 = TopicName.get(String.format("persistent://%s", topic)).getPartition(0).toString();
+        Optional<Topic> topicReference = pulsar.getBrokerService().getTopicReference(partition0);
+        Assert.assertTrue(topicReference.isPresent());
+        PersistentTopic persistentTopic = (PersistentTopic) topicReference.get();
+        ManagedLedgerConfig config = persistentTopic.getManagedLedger().getConfig();
+        config.setMinimumRolloverTime(1, TimeUnit.SECONDS);
+        config.setMaximumRolloverTime(1, TimeUnit.SECONDS);
+        persistentTopic.getManagedLedger().setConfig(config);
+        Whitebox.invokeMethod(persistentTopic.getManagedLedger(), "updateLastLedgerCreatedTimeAndScheduleRolloverTask");
+        String msg1 = "msg-1";
+        producer.send(msg1);
+        Thread.sleep(3 * 1000);
+
+        Consumer<String> consumer2 = pulsarClient.newConsumer(Schema.STRING)
+                .topic(topic)
+                .subscriptionName("sub-1")
+                .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                .subscriptionType(SubscriptionType.Key_Shared)
+                .subscribe();
+
+        Message<String> receive = consumer2.receive();
+        consumer2.acknowledge(receive);
+
+        Thread.sleep(3 * 1000);
+
+        try {
+            Producer<String> producerN = PulsarClient.builder()
+                    .maxBackoffInterval(3, TimeUnit.SECONDS)
+                    .operationTimeout(5, TimeUnit.SECONDS)
+                    .serviceUrl(lookupUrl.toString()).connectionTimeout(2, TimeUnit.SECONDS).build()
+                    .newProducer(Schema.STRING).topic(topic).sendTimeout(3, TimeUnit.SECONDS).create();
+            Assert.assertTrue(producerN.isConnected());
+            producerN.close();
+        } catch (Exception ex) {
+            Assert.fail("failed to create producer");
+        }
+    }
 }