You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/04/06 07:22:26 UTC

[pulsar] branch master updated: [fix][broker] Avoid heartbeat topic to offload. (#15008)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new cdb67e43622 [fix][broker] Avoid heartbeat topic to offload. (#15008)
cdb67e43622 is described below

commit cdb67e436225fce52105986c8f3906ed331f6f4d
Author: Jiwei Guo <te...@apache.org>
AuthorDate: Wed Apr 6 15:22:19 2022 +0800

    [fix][broker] Avoid heartbeat topic to offload. (#15008)
---
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java |  4 +++
 .../bookkeeper/mledger/impl/OffloadPrefixTest.java |  2 +-
 .../pulsar/broker/service/BrokerService.java       | 28 ++++++++++-------
 .../systopic/PartitionedSystemTopicTest.java       | 35 ++++++++++++++++++++++
 4 files changed, 57 insertions(+), 12 deletions(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 7eccca1c17a..37858afe3d8 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -2847,6 +2847,10 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
 
     @Override
     public void asyncOffloadPrefix(Position pos, OffloadCallback callback, Object ctx) {
+        if (config.getLedgerOffloader() != null && config.getLedgerOffloader() == NullLedgerOffloader.INSTANCE) {
+            callback.offloadFailed(new ManagedLedgerException("NullLedgerOffloader"), ctx);
+            return;
+        }
         PositionImpl requestOffloadTo = (PositionImpl) pos;
         if (!isValidPosition(requestOffloadTo)
                 // Also consider the case where the last ledger is currently
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java
index d0da8e65b51..a59f53d4575 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java
@@ -85,7 +85,7 @@ public class OffloadPrefixTest extends MockedBookKeeperTestCase {
             ledger.offloadPrefix(p);
             fail("Should have thrown an exception");
         } catch (ManagedLedgerException e) {
-            assertEquals(e.getCause().getClass(), CompletionException.class);
+            assertEquals(e.getMessage(), "NullLedgerOffloader");
         }
         assertEquals(ledger.getLedgersInfoAsList().size(), 5);
         assertEquals(ledger.getLedgersInfoAsList().stream()
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 530bfd067ed..8829841db6c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -87,6 +87,7 @@ import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
 import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import org.apache.bookkeeper.mledger.ManagedLedgerException.ManagedLedgerNotFoundException;
 import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
+import org.apache.bookkeeper.mledger.impl.NullLedgerOffloader;
 import org.apache.bookkeeper.mledger.util.Futures;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.bookie.rackawareness.IsolatedBookieEnsemblePlacementPolicy;
@@ -102,6 +103,7 @@ import org.apache.pulsar.broker.delayed.DelayedDeliveryTrackerLoader;
 import org.apache.pulsar.broker.intercept.BrokerInterceptor;
 import org.apache.pulsar.broker.intercept.ManagedLedgerInterceptorImpl;
 import org.apache.pulsar.broker.loadbalance.LoadManager;
+import org.apache.pulsar.broker.namespace.NamespaceService;
 import org.apache.pulsar.broker.resources.LocalPoliciesResources;
 import org.apache.pulsar.broker.resources.NamespaceResources;
 import org.apache.pulsar.broker.resources.NamespaceResources.PartitionedTopicResources;
@@ -1604,18 +1606,22 @@ public class BrokerService implements Closeable {
                             topicLevelOffloadPolicies,
                             OffloadPoliciesImpl.oldPoliciesCompatible(nsLevelOffloadPolicies, policies.orElse(null)),
                             getPulsar().getConfig().getProperties());
-                    if (topicLevelOffloadPolicies != null) {
-                        try {
-                            LedgerOffloader topicLevelLedgerOffLoader =
-                                    pulsar().createManagedLedgerOffloader(offloadPolicies);
-                            managedLedgerConfig.setLedgerOffloader(topicLevelLedgerOffLoader);
-                        } catch (PulsarServerException e) {
-                            throw new RuntimeException(e);
+                    if (NamespaceService.isSystemServiceNamespace(namespace.toString())) {
+                        managedLedgerConfig.setLedgerOffloader(NullLedgerOffloader.INSTANCE);
+                    } else  {
+                        if (topicLevelOffloadPolicies != null) {
+                            try {
+                                LedgerOffloader topicLevelLedgerOffLoader =
+                                        pulsar().createManagedLedgerOffloader(offloadPolicies);
+                                managedLedgerConfig.setLedgerOffloader(topicLevelLedgerOffLoader);
+                            } catch (PulsarServerException e) {
+                                throw new RuntimeException(e);
+                            }
+                        } else {
+                            //If the topic level policy is null, use the namespace level
+                            managedLedgerConfig
+                                    .setLedgerOffloader(pulsar.getManagedLedgerOffloader(namespace, offloadPolicies));
                         }
-                    } else {
-                        //If the topic level policy is null, use the namespace level
-                        managedLedgerConfig
-                                .setLedgerOffloader(pulsar.getManagedLedgerOffloader(namespace, offloadPolicies));
                     }
 
                     managedLedgerConfig.setDeletionAtBatchIndexLevelEnabled(
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
index ff45c140f56..d506f712767 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
@@ -20,8 +20,14 @@ package org.apache.pulsar.broker.systopic;
 
 import com.google.common.collect.Sets;
 import lombok.Cleanup;
+import org.apache.bookkeeper.mledger.LedgerOffloader;
+import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
+import org.apache.bookkeeper.mledger.impl.NullLedgerOffloader;
 import org.apache.commons.lang.RandomStringUtils;
+import org.apache.pulsar.broker.admin.impl.BrokersBase;
+import org.apache.pulsar.broker.namespace.NamespaceService;
 import org.apache.pulsar.broker.service.BrokerTestBase;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.Producer;
@@ -30,8 +36,13 @@ import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.common.events.EventsTopicNames;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.naming.TopicVersion;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
 import org.apache.pulsar.common.util.FutureUtil;
+import org.awaitility.Awaitility;
+import org.mockito.Mockito;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
@@ -135,4 +146,28 @@ public class PartitionedSystemTopicTest extends BrokerTestBase {
         Assert.assertNotNull(receive);
     }
 
+    @Test
+    public void testHealthCheckTopicNotOffload() throws Exception {
+        NamespaceName namespaceName = NamespaceService.getHeartbeatNamespaceV2(pulsar.getAdvertisedAddress(),
+                pulsar.getConfig());
+        TopicName topicName = TopicName.get("persistent", namespaceName, BrokersBase.HEALTH_CHECK_TOPIC_SUFFIX);
+        PersistentTopic persistentTopic = (PersistentTopic) pulsar.getBrokerService()
+                .getTopic(topicName.toString(), true).get().get();
+        ManagedLedgerConfig config = persistentTopic.getManagedLedger().getConfig();
+        config.setLedgerOffloader(NullLedgerOffloader.INSTANCE);
+        admin.brokers().healthcheck(TopicVersion.V2);
+        admin.topics().triggerOffload(topicName.toString(), MessageId.earliest);
+        Awaitility.await().untilAsserted(() -> {
+            Assert.assertEquals(persistentTopic.getManagedLedger().getOffloadedSize(), 0);
+        });
+        LedgerOffloader ledgerOffloader = Mockito.mock(LedgerOffloader.class);
+        config.setLedgerOffloader(ledgerOffloader);
+        Assert.assertEquals(config.getLedgerOffloader(), ledgerOffloader);
+        admin.topicPolicies().setMaxConsumers(topicName.toString(), 2);
+        Awaitility.await().pollDelay(5, TimeUnit.SECONDS).untilAsserted(() -> {
+            Assert.assertEquals(persistentTopic.getManagedLedger().getConfig().getLedgerOffloader(),
+                    NullLedgerOffloader.INSTANCE);
+        });
+    }
+
 }