You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2022/10/13 06:50:01 UTC
[pulsar] 01/02: [fix][broker] Fix system service namespace create internal event topic. (#17867)
This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 760b1adf56f5424dedfed4c41805c8874463f29b
Author: Jiwei Guo <te...@apache.org>
AuthorDate: Thu Oct 13 13:17:16 2022 +0800
[fix][broker] Fix system service namespace create internal event topic. (#17867)
---
.../pulsar/broker/service/BrokerService.java | 3 ++-
.../SystemTopicBasedTopicPoliciesService.java | 4 +++
.../apache/pulsar/broker/admin/AdminApi2Test.java | 2 +-
.../systopic/PartitionedSystemTopicTest.java | 30 +++++++++++++++++++---
4 files changed, 33 insertions(+), 6 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index b704044e139..b437f976837 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -1507,7 +1507,8 @@ public class BrokerService implements Closeable {
RetentionPolicies retentionPolicies = null;
OffloadPoliciesImpl topicLevelOffloadPolicies = null;
- if (pulsar.getConfig().isTopicLevelPoliciesEnabled()) {
+ if (pulsar.getConfig().isTopicLevelPoliciesEnabled()
+ && !NamespaceService.isSystemServiceNamespace(namespace.toString())) {
try {
TopicPolicies topicPolicies = pulsar.getTopicPoliciesService().getTopicPolicies(topicName);
if (topicPolicies != null) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
index f5db6e2311b..26879bff3ce 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
@@ -101,6 +101,10 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic
private CompletableFuture<Void> sendTopicPolicyEvent(TopicName topicName, ActionType actionType,
TopicPolicies policies) {
+ if (NamespaceService.isHeartbeatNamespace(topicName.getNamespaceObject())) {
+ return CompletableFuture.failedFuture(
+ new BrokerServiceException.NotAllowedException("Not allowed to send event to health check topic"));
+ }
CompletableFuture<Void> result = new CompletableFuture<>();
try {
createSystemTopicFactoryIfNeeded();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
index 31b3ee5355a..bb8749e9226 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
@@ -524,7 +524,7 @@ public class AdminApi2Test extends MockedPulsarServiceBaseTest {
* @param namespaceName
* @throws Exception
*/
- @Test(dataProvider = "namespaceNames", timeOut = 10000)
+ @Test(dataProvider = "namespaceNames", timeOut = 30000)
public void testResetCursorOnPosition(String namespaceName) throws Exception {
final String topicName = "persistent://prop-xyz/use/" + namespaceName + "/resetPosition";
final int totalProducedMessages = 50;
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
index d4ed12573f3..d2ae23bb6c9 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
@@ -30,6 +30,7 @@ import org.apache.pulsar.broker.service.BrokerTestBase;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.admin.ListTopicsOptions;
+import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
@@ -172,10 +173,31 @@ public class PartitionedSystemTopicTest extends BrokerTestBase {
LedgerOffloader ledgerOffloader = Mockito.mock(LedgerOffloader.class);
config.setLedgerOffloader(ledgerOffloader);
Assert.assertEquals(config.getLedgerOffloader(), ledgerOffloader);
- admin.topicPolicies().setMaxConsumers(topicName.toString(), 2);
- Awaitility.await().pollDelay(5, TimeUnit.SECONDS).untilAsserted(() -> {
- Assert.assertEquals(persistentTopic.getManagedLedger().getConfig().getLedgerOffloader(),
- NullLedgerOffloader.INSTANCE);
+ }
+
+ @Test
+ public void testSystemNamespaceNotCreateChangeEventsTopic() throws Exception {
+ admin.brokers().healthcheck(TopicVersion.V2);
+ NamespaceName namespaceName = NamespaceService.getHeartbeatNamespaceV2(pulsar.getAdvertisedAddress(),
+ pulsar.getConfig());
+ TopicName topicName = TopicName.get("persistent", namespaceName, SystemTopicNames.NAMESPACE_EVENTS_LOCAL_NAME);
+ Optional<Topic> optionalTopic = pulsar.getBrokerService()
+ .getTopic(topicName.getPartition(1).toString(), false).join();
+ Assert.assertTrue(optionalTopic.isEmpty());
+ }
+
+ @Test
+ public void testHeartbeatTopicNotAllowedToSendEvent() throws Exception {
+ admin.brokers().healthcheck(TopicVersion.V2);
+ NamespaceName namespaceName = NamespaceService.getHeartbeatNamespaceV2(pulsar.getAdvertisedAddress(),
+ pulsar.getConfig());
+ TopicName topicName = TopicName.get("persistent", namespaceName, SystemTopicNames.NAMESPACE_EVENTS_LOCAL_NAME);
+ for (int partition = 0; partition < PARTITIONS; partition ++) {
+ pulsar.getBrokerService()
+ .getTopic(topicName.getPartition(partition).toString(), true).join();
+ }
+ Assert.assertThrows(PulsarAdminException.ConflictException.class, () -> {
+ admin.topicPolicies().setMaxConsumers(topicName.toString(), 2);
});
}