You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2022/10/13 06:50:00 UTC

[pulsar] branch branch-2.11 updated (1fc4ccf8240 -> 1adf0427c9d)

This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a change to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git


    from 1fc4ccf8240 [bug][broker] fix memory leak in case of error conditions in PendingReadsManager (#17995)
     new 760b1adf56f [fix][broker] Fix system service namespace create internal event topic. (#17867)
     new 1adf0427c9d [fix][broker] Fix `getPositionAfterN` infinite loop. (#17971)

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 17 ++++--------
 .../bookkeeper/mledger/impl/ManagedLedgerTest.java | 17 ++++++++++++
 .../pulsar/broker/service/BrokerService.java       |  3 ++-
 .../SystemTopicBasedTopicPoliciesService.java      |  4 +++
 .../apache/pulsar/broker/admin/AdminApi2Test.java  |  2 +-
 .../systopic/PartitionedSystemTopicTest.java       | 30 +++++++++++++++++++---
 6 files changed, 55 insertions(+), 18 deletions(-)


[pulsar] 02/02: [fix][broker] Fix `getPositionAfterN` infinite loop. (#17971)

Posted by te...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 1adf0427c9dd0e1c2e56ec1a3bb1ad2c2110ca54
Author: Jiwei Guo <te...@apache.org>
AuthorDate: Thu Oct 13 14:46:14 2022 +0800

    [fix][broker] Fix `getPositionAfterN` infinite loop. (#17971)
---
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java      | 17 +++++------------
 .../bookkeeper/mledger/impl/ManagedLedgerTest.java      | 17 +++++++++++++++++
 2 files changed, 22 insertions(+), 12 deletions(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 263a3612ceb..230deb27d17 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -3274,20 +3274,16 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
         long entriesToSkip = n;
         long currentLedgerId;
         long currentEntryId;
-
         if (startRange == PositionBound.startIncluded) {
             currentLedgerId = startPosition.getLedgerId();
             currentEntryId = startPosition.getEntryId();
         } else {
-            // e.g. a mark-delete position
             PositionImpl nextValidPosition = getNextValidPosition(startPosition);
             currentLedgerId = nextValidPosition.getLedgerId();
             currentEntryId = nextValidPosition.getEntryId();
         }
-
         boolean lastLedger = false;
         long totalEntriesInCurrentLedger;
-
         while (entriesToSkip >= 0) {
             // for the current ledger, the number of entries written is deduced from the lastConfirmedEntry
             // for previous ledgers, LedgerInfo in ZK has the number of entries
@@ -3302,10 +3298,8 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
                 LedgerInfo ledgerInfo = ledgers.get(currentLedgerId);
                 totalEntriesInCurrentLedger = ledgerInfo != null ? ledgerInfo.getEntries() : 0;
             }
-
-
-            long unreadEntriesInCurrentLedger = totalEntriesInCurrentLedger - currentEntryId;
-
+            long unreadEntriesInCurrentLedger = totalEntriesInCurrentLedger > 0
+                    ? totalEntriesInCurrentLedger - currentEntryId : 0;
             if (unreadEntriesInCurrentLedger >= entriesToSkip) {
                 // if the current ledger has more entries than what we need to skip
                 // then the return position is in the same ledger
@@ -3318,11 +3312,10 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
                     // there are no more ledgers, return the last position
                     currentEntryId = totalEntriesInCurrentLedger;
                     break;
-                } else {
-                    Long lid = ledgers.ceilingKey(currentLedgerId + 1);
-                    currentLedgerId = lid != null ? lid : (ledgers.lastKey() + 1);
-                    currentEntryId = 0;
                 }
+                Long lid = ledgers.ceilingKey(currentLedgerId + 1);
+                currentLedgerId = lid != null ? lid : ledgers.lastKey();
+                currentEntryId = 0;
             }
         }
 
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index 37fa56cd989..7eee82be9dd 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -2567,6 +2567,23 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
         log.info("Target position is {}", targetPosition);
         assertEquals(targetPosition.getLedgerId(), secondLedger);
         assertEquals(targetPosition.getEntryId(), 4);
+
+        // test for n > NumberOfEntriesInStorage
+        searchPosition = new PositionImpl(secondLedger, 0);
+        targetPosition = managedLedger.getPositionAfterN(searchPosition, 100, ManagedLedgerImpl.PositionBound.startIncluded);
+        assertEquals(targetPosition.getLedgerId(), secondLedger);
+        assertEquals(targetPosition.getEntryId(), 4);
+
+        // test for startPosition > current ledger
+        searchPosition = new PositionImpl(999, 0);
+        targetPosition = managedLedger.getPositionAfterN(searchPosition, 0, ManagedLedgerImpl.PositionBound.startIncluded);
+        assertEquals(targetPosition.getLedgerId(), secondLedger);
+        assertEquals(targetPosition.getEntryId(), 4);
+
+        searchPosition = new PositionImpl(999, 0);
+        targetPosition = managedLedger.getPositionAfterN(searchPosition, 10, ManagedLedgerImpl.PositionBound.startExcluded);
+        assertEquals(targetPosition.getLedgerId(), secondLedger);
+        assertEquals(targetPosition.getEntryId(), 4);
     }
 
     @Test


[pulsar] 01/02: [fix][broker] Fix system service namespace create internal event topic. (#17867)

Posted by te...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 760b1adf56f5424dedfed4c41805c8874463f29b
Author: Jiwei Guo <te...@apache.org>
AuthorDate: Thu Oct 13 13:17:16 2022 +0800

    [fix][broker] Fix system service namespace create internal event topic. (#17867)
---
 .../pulsar/broker/service/BrokerService.java       |  3 ++-
 .../SystemTopicBasedTopicPoliciesService.java      |  4 +++
 .../apache/pulsar/broker/admin/AdminApi2Test.java  |  2 +-
 .../systopic/PartitionedSystemTopicTest.java       | 30 +++++++++++++++++++---
 4 files changed, 33 insertions(+), 6 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index b704044e139..b437f976837 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -1507,7 +1507,8 @@ public class BrokerService implements Closeable {
                     RetentionPolicies retentionPolicies = null;
                     OffloadPoliciesImpl topicLevelOffloadPolicies = null;
 
-                    if (pulsar.getConfig().isTopicLevelPoliciesEnabled()) {
+                    if (pulsar.getConfig().isTopicLevelPoliciesEnabled()
+                            && !NamespaceService.isSystemServiceNamespace(namespace.toString())) {
                         try {
                             TopicPolicies topicPolicies = pulsar.getTopicPoliciesService().getTopicPolicies(topicName);
                             if (topicPolicies != null) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
index f5db6e2311b..26879bff3ce 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
@@ -101,6 +101,10 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic
 
     private CompletableFuture<Void> sendTopicPolicyEvent(TopicName topicName, ActionType actionType,
                                                          TopicPolicies policies) {
+        if (NamespaceService.isHeartbeatNamespace(topicName.getNamespaceObject())) {
+            return CompletableFuture.failedFuture(
+                    new BrokerServiceException.NotAllowedException("Not allowed to send event to health check topic"));
+        }
         CompletableFuture<Void> result = new CompletableFuture<>();
         try {
             createSystemTopicFactoryIfNeeded();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
index 31b3ee5355a..bb8749e9226 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
@@ -524,7 +524,7 @@ public class AdminApi2Test extends MockedPulsarServiceBaseTest {
      * @param namespaceName
      * @throws Exception
      */
-    @Test(dataProvider = "namespaceNames", timeOut = 10000)
+    @Test(dataProvider = "namespaceNames", timeOut = 30000)
     public void testResetCursorOnPosition(String namespaceName) throws Exception {
         final String topicName = "persistent://prop-xyz/use/" + namespaceName + "/resetPosition";
         final int totalProducedMessages = 50;
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
index d4ed12573f3..d2ae23bb6c9 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
@@ -30,6 +30,7 @@ import org.apache.pulsar.broker.service.BrokerTestBase;
 import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.client.admin.ListTopicsOptions;
+import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.Producer;
@@ -172,10 +173,31 @@ public class PartitionedSystemTopicTest extends BrokerTestBase {
         LedgerOffloader ledgerOffloader = Mockito.mock(LedgerOffloader.class);
         config.setLedgerOffloader(ledgerOffloader);
         Assert.assertEquals(config.getLedgerOffloader(), ledgerOffloader);
-        admin.topicPolicies().setMaxConsumers(topicName.toString(), 2);
-        Awaitility.await().pollDelay(5, TimeUnit.SECONDS).untilAsserted(() -> {
-            Assert.assertEquals(persistentTopic.getManagedLedger().getConfig().getLedgerOffloader(),
-                    NullLedgerOffloader.INSTANCE);
+    }
+
+    @Test
+    public void testSystemNamespaceNotCreateChangeEventsTopic() throws Exception {
+        admin.brokers().healthcheck(TopicVersion.V2);
+        NamespaceName namespaceName = NamespaceService.getHeartbeatNamespaceV2(pulsar.getAdvertisedAddress(),
+                pulsar.getConfig());
+        TopicName topicName = TopicName.get("persistent", namespaceName, SystemTopicNames.NAMESPACE_EVENTS_LOCAL_NAME);
+        Optional<Topic> optionalTopic = pulsar.getBrokerService()
+                .getTopic(topicName.getPartition(1).toString(), false).join();
+        Assert.assertTrue(optionalTopic.isEmpty());
+    }
+
+    @Test
+    public void testHeartbeatTopicNotAllowedToSendEvent() throws Exception {
+        admin.brokers().healthcheck(TopicVersion.V2);
+        NamespaceName namespaceName = NamespaceService.getHeartbeatNamespaceV2(pulsar.getAdvertisedAddress(),
+                pulsar.getConfig());
+        TopicName topicName = TopicName.get("persistent", namespaceName, SystemTopicNames.NAMESPACE_EVENTS_LOCAL_NAME);
+        for (int partition = 0; partition < PARTITIONS; partition ++) {
+            pulsar.getBrokerService()
+                    .getTopic(topicName.getPartition(partition).toString(), true).join();
+        }
+        Assert.assertThrows(PulsarAdminException.ConflictException.class, () -> {
+            admin.topicPolicies().setMaxConsumers(topicName.toString(), 2);
         });
     }