You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/04/06 07:22:26 UTC
[pulsar] branch master updated: [fix][broker] Avoid heartbeat topic to offload. (#15008)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new cdb67e43622 [fix][broker] Avoid heartbeat topic to offload. (#15008)
cdb67e43622 is described below
commit cdb67e436225fce52105986c8f3906ed331f6f4d
Author: Jiwei Guo <te...@apache.org>
AuthorDate: Wed Apr 6 15:22:19 2022 +0800
[fix][broker] Avoid heartbeat topic to offload. (#15008)
---
.../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 4 +++
.../bookkeeper/mledger/impl/OffloadPrefixTest.java | 2 +-
.../pulsar/broker/service/BrokerService.java | 28 ++++++++++-------
.../systopic/PartitionedSystemTopicTest.java | 35 ++++++++++++++++++++++
4 files changed, 57 insertions(+), 12 deletions(-)
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 7eccca1c17a..37858afe3d8 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -2847,6 +2847,10 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
@Override
public void asyncOffloadPrefix(Position pos, OffloadCallback callback, Object ctx) {
+ if (config.getLedgerOffloader() != null && config.getLedgerOffloader() == NullLedgerOffloader.INSTANCE) {
+ callback.offloadFailed(new ManagedLedgerException("NullLedgerOffloader"), ctx);
+ return;
+ }
PositionImpl requestOffloadTo = (PositionImpl) pos;
if (!isValidPosition(requestOffloadTo)
// Also consider the case where the last ledger is currently
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java
index d0da8e65b51..a59f53d4575 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java
@@ -85,7 +85,7 @@ public class OffloadPrefixTest extends MockedBookKeeperTestCase {
ledger.offloadPrefix(p);
fail("Should have thrown an exception");
} catch (ManagedLedgerException e) {
- assertEquals(e.getCause().getClass(), CompletionException.class);
+ assertEquals(e.getMessage(), "NullLedgerOffloader");
}
assertEquals(ledger.getLedgersInfoAsList().size(), 5);
assertEquals(ledger.getLedgersInfoAsList().stream()
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 530bfd067ed..8829841db6c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -87,6 +87,7 @@ import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.ManagedLedgerException.ManagedLedgerNotFoundException;
import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
+import org.apache.bookkeeper.mledger.impl.NullLedgerOffloader;
import org.apache.bookkeeper.mledger.util.Futures;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.bookie.rackawareness.IsolatedBookieEnsemblePlacementPolicy;
@@ -102,6 +103,7 @@ import org.apache.pulsar.broker.delayed.DelayedDeliveryTrackerLoader;
import org.apache.pulsar.broker.intercept.BrokerInterceptor;
import org.apache.pulsar.broker.intercept.ManagedLedgerInterceptorImpl;
import org.apache.pulsar.broker.loadbalance.LoadManager;
+import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.resources.LocalPoliciesResources;
import org.apache.pulsar.broker.resources.NamespaceResources;
import org.apache.pulsar.broker.resources.NamespaceResources.PartitionedTopicResources;
@@ -1604,18 +1606,22 @@ public class BrokerService implements Closeable {
topicLevelOffloadPolicies,
OffloadPoliciesImpl.oldPoliciesCompatible(nsLevelOffloadPolicies, policies.orElse(null)),
getPulsar().getConfig().getProperties());
- if (topicLevelOffloadPolicies != null) {
- try {
- LedgerOffloader topicLevelLedgerOffLoader =
- pulsar().createManagedLedgerOffloader(offloadPolicies);
- managedLedgerConfig.setLedgerOffloader(topicLevelLedgerOffLoader);
- } catch (PulsarServerException e) {
- throw new RuntimeException(e);
+ if (NamespaceService.isSystemServiceNamespace(namespace.toString())) {
+ managedLedgerConfig.setLedgerOffloader(NullLedgerOffloader.INSTANCE);
+ } else {
+ if (topicLevelOffloadPolicies != null) {
+ try {
+ LedgerOffloader topicLevelLedgerOffLoader =
+ pulsar().createManagedLedgerOffloader(offloadPolicies);
+ managedLedgerConfig.setLedgerOffloader(topicLevelLedgerOffLoader);
+ } catch (PulsarServerException e) {
+ throw new RuntimeException(e);
+ }
+ } else {
+ //If the topic level policy is null, use the namespace level
+ managedLedgerConfig
+ .setLedgerOffloader(pulsar.getManagedLedgerOffloader(namespace, offloadPolicies));
}
- } else {
- //If the topic level policy is null, use the namespace level
- managedLedgerConfig
- .setLedgerOffloader(pulsar.getManagedLedgerOffloader(namespace, offloadPolicies));
}
managedLedgerConfig.setDeletionAtBatchIndexLevelEnabled(
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
index ff45c140f56..d506f712767 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
@@ -20,8 +20,14 @@ package org.apache.pulsar.broker.systopic;
import com.google.common.collect.Sets;
import lombok.Cleanup;
+import org.apache.bookkeeper.mledger.LedgerOffloader;
+import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
+import org.apache.bookkeeper.mledger.impl.NullLedgerOffloader;
import org.apache.commons.lang.RandomStringUtils;
+import org.apache.pulsar.broker.admin.impl.BrokersBase;
+import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.service.BrokerTestBase;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
@@ -30,8 +36,13 @@ import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.events.EventsTopicNames;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.naming.TopicVersion;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.util.FutureUtil;
+import org.awaitility.Awaitility;
+import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
@@ -135,4 +146,28 @@ public class PartitionedSystemTopicTest extends BrokerTestBase {
Assert.assertNotNull(receive);
}
+ @Test
+ public void testHealthCheckTopicNotOffload() throws Exception {
+ NamespaceName namespaceName = NamespaceService.getHeartbeatNamespaceV2(pulsar.getAdvertisedAddress(),
+ pulsar.getConfig());
+ TopicName topicName = TopicName.get("persistent", namespaceName, BrokersBase.HEALTH_CHECK_TOPIC_SUFFIX);
+ PersistentTopic persistentTopic = (PersistentTopic) pulsar.getBrokerService()
+ .getTopic(topicName.toString(), true).get().get();
+ ManagedLedgerConfig config = persistentTopic.getManagedLedger().getConfig();
+ config.setLedgerOffloader(NullLedgerOffloader.INSTANCE);
+ admin.brokers().healthcheck(TopicVersion.V2);
+ admin.topics().triggerOffload(topicName.toString(), MessageId.earliest);
+ Awaitility.await().untilAsserted(() -> {
+ Assert.assertEquals(persistentTopic.getManagedLedger().getOffloadedSize(), 0);
+ });
+ LedgerOffloader ledgerOffloader = Mockito.mock(LedgerOffloader.class);
+ config.setLedgerOffloader(ledgerOffloader);
+ Assert.assertEquals(config.getLedgerOffloader(), ledgerOffloader);
+ admin.topicPolicies().setMaxConsumers(topicName.toString(), 2);
+ Awaitility.await().pollDelay(5, TimeUnit.SECONDS).untilAsserted(() -> {
+ Assert.assertEquals(persistentTopic.getManagedLedger().getConfig().getLedgerOffloader(),
+ NullLedgerOffloader.INSTANCE);
+ });
+ }
+
}