You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rd...@apache.org on 2024/04/19 17:31:01 UTC

(pulsar) branch master updated: [fix][broker] Fix broken topic policy implementation compatibility with old pulsar version (#22535)

This is an automated email from the ASF dual-hosted git repository.

rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 59daac64c21 [fix][broker] Fix broken topic policy implementation compatibility with old pulsar version (#22535)
59daac64c21 is described below

commit 59daac64c210f539e733f883edad09d08333aa62
Author: Rajan Dhabalia <rd...@apache.org>
AuthorDate: Fri Apr 19 10:30:55 2024 -0700

    [fix][broker] Fix broken topic policy implementation compatibility with old pulsar version (#22535)
---
 .../pulsar/broker/service/AbstractTopic.java       | 52 +++++++++++++---------
 ...kerInternalClientConfigurationOverrideTest.java | 42 ++++++++++++++++-
 2 files changed, 72 insertions(+), 22 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index e772486fcc6..44a4ca42cea 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -220,13 +220,16 @@ public abstract class AbstractTopic implements Topic, TopicPolicyListener<TopicP
         topicPolicies.getRetentionPolicies().updateTopicValue(data.getRetentionPolicies());
         topicPolicies.getDispatcherPauseOnAckStatePersistentEnabled()
                 .updateTopicValue(data.getDispatcherPauseOnAckStatePersistentEnabled());
-        topicPolicies.getMaxSubscriptionsPerTopic().updateTopicValue(data.getMaxSubscriptionsPerTopic());
-        topicPolicies.getMaxUnackedMessagesOnConsumer().updateTopicValue(data.getMaxUnackedMessagesOnConsumer());
+        topicPolicies.getMaxSubscriptionsPerTopic()
+                .updateTopicValue(normalizeValue(data.getMaxSubscriptionsPerTopic()));
+        topicPolicies.getMaxUnackedMessagesOnConsumer()
+                .updateTopicValue(normalizeValue(data.getMaxUnackedMessagesOnConsumer()));
         topicPolicies.getMaxUnackedMessagesOnSubscription()
-                .updateTopicValue(data.getMaxUnackedMessagesOnSubscription());
-        topicPolicies.getMaxProducersPerTopic().updateTopicValue(data.getMaxProducerPerTopic());
-        topicPolicies.getMaxConsumerPerTopic().updateTopicValue(data.getMaxConsumerPerTopic());
-        topicPolicies.getMaxConsumersPerSubscription().updateTopicValue(data.getMaxConsumersPerSubscription());
+                .updateTopicValue(normalizeValue(data.getMaxUnackedMessagesOnSubscription()));
+        topicPolicies.getMaxProducersPerTopic().updateTopicValue(normalizeValue(data.getMaxProducerPerTopic()));
+        topicPolicies.getMaxConsumerPerTopic().updateTopicValue(normalizeValue(data.getMaxConsumerPerTopic()));
+        topicPolicies.getMaxConsumersPerSubscription()
+                .updateTopicValue(normalizeValue(data.getMaxConsumersPerSubscription()));
         topicPolicies.getInactiveTopicPolicies().updateTopicValue(data.getInactiveTopicPolicies());
         topicPolicies.getDeduplicationEnabled().updateTopicValue(data.getDeduplicationEnabled());
         topicPolicies.getDeduplicationSnapshotIntervalSeconds().updateTopicValue(
@@ -237,8 +240,8 @@ public abstract class AbstractTopic implements Topic, TopicPolicyListener<TopicP
         Arrays.stream(BacklogQuota.BacklogQuotaType.values()).forEach(type ->
                 this.topicPolicies.getBackLogQuotaMap().get(type).updateTopicValue(
                         data.getBackLogQuotaMap() == null ? null : data.getBackLogQuotaMap().get(type.toString())));
-        topicPolicies.getTopicMaxMessageSize().updateTopicValue(data.getMaxMessageSize());
-        topicPolicies.getMessageTTLInSeconds().updateTopicValue(data.getMessageTTLInSeconds());
+        topicPolicies.getTopicMaxMessageSize().updateTopicValue(normalizeValue(data.getMaxMessageSize()));
+        topicPolicies.getMessageTTLInSeconds().updateTopicValue(normalizeValue(data.getMessageTTLInSeconds()));
         topicPolicies.getPublishRate().updateTopicValue(PublishRate.normalize(data.getPublishRate()));
         topicPolicies.getDelayedDeliveryEnabled().updateTopicValue(data.getDelayedDeliveryEnabled());
         topicPolicies.getReplicatorDispatchRate().updateTopicValue(
@@ -268,15 +271,19 @@ public abstract class AbstractTopic implements Topic, TopicPolicyListener<TopicP
         topicPolicies.getReplicationClusters().updateNamespaceValue(
                 new ArrayList<>(CollectionUtils.emptyIfNull(namespacePolicies.replication_clusters)));
         topicPolicies.getMaxUnackedMessagesOnConsumer()
-                .updateNamespaceValue(namespacePolicies.max_unacked_messages_per_consumer);
+                .updateNamespaceValue(normalizeValue(namespacePolicies.max_unacked_messages_per_consumer));
         topicPolicies.getMaxUnackedMessagesOnSubscription()
-                .updateNamespaceValue(namespacePolicies.max_unacked_messages_per_subscription);
-        topicPolicies.getMessageTTLInSeconds().updateNamespaceValue(namespacePolicies.message_ttl_in_seconds);
-        topicPolicies.getMaxSubscriptionsPerTopic().updateNamespaceValue(namespacePolicies.max_subscriptions_per_topic);
-        topicPolicies.getMaxProducersPerTopic().updateNamespaceValue(namespacePolicies.max_producers_per_topic);
-        topicPolicies.getMaxConsumerPerTopic().updateNamespaceValue(namespacePolicies.max_consumers_per_topic);
+                .updateNamespaceValue(normalizeValue(namespacePolicies.max_unacked_messages_per_subscription));
+        topicPolicies.getMessageTTLInSeconds()
+                .updateNamespaceValue(normalizeValue(namespacePolicies.message_ttl_in_seconds));
+        topicPolicies.getMaxSubscriptionsPerTopic()
+                .updateNamespaceValue(normalizeValue(namespacePolicies.max_subscriptions_per_topic));
+        topicPolicies.getMaxProducersPerTopic()
+                .updateNamespaceValue(normalizeValue(namespacePolicies.max_producers_per_topic));
+        topicPolicies.getMaxConsumerPerTopic()
+                .updateNamespaceValue(normalizeValue(namespacePolicies.max_consumers_per_topic));
         topicPolicies.getMaxConsumersPerSubscription()
-                .updateNamespaceValue(namespacePolicies.max_consumers_per_subscription);
+                .updateNamespaceValue(normalizeValue(namespacePolicies.max_consumers_per_subscription));
         topicPolicies.getInactiveTopicPolicies().updateNamespaceValue(namespacePolicies.inactive_topic_policies);
         topicPolicies.getDeduplicationEnabled().updateNamespaceValue(namespacePolicies.deduplicationEnabled);
         topicPolicies.getDeduplicationSnapshotIntervalSeconds().updateNamespaceValue(
@@ -312,6 +319,10 @@ public abstract class AbstractTopic implements Topic, TopicPolicyListener<TopicP
         updateEntryFilters();
     }
 
+    private Integer normalizeValue(Integer policyValue) {
+        return policyValue != null && policyValue < 0 ? null : policyValue;
+    }
+
     private void updateNamespaceDispatchRate(Policies namespacePolicies, String cluster) {
         DispatchRateImpl dispatchRate = namespacePolicies.topicDispatchRate.get(cluster);
         if (dispatchRate == null) {
@@ -370,12 +381,11 @@ public abstract class AbstractTopic implements Topic, TopicPolicyListener<TopicP
         topicPolicies.getMaxConsumerPerTopic().updateBrokerValue(config.getMaxConsumersPerTopic());
         topicPolicies.getMaxConsumersPerSubscription().updateBrokerValue(config.getMaxConsumersPerSubscription());
         topicPolicies.getDeduplicationEnabled().updateBrokerValue(config.isBrokerDeduplicationEnabled());
-        topicPolicies.getRetentionPolicies().updateBrokerValue(new RetentionPolicies(
-                config.getDefaultRetentionTimeInMinutes(), config.getDefaultRetentionSizeInMB()));
-        topicPolicies.getDeduplicationSnapshotIntervalSeconds().updateBrokerValue(
-                config.getBrokerDeduplicationSnapshotIntervalSeconds());
-        topicPolicies.getMaxUnackedMessagesOnConsumer()
-                .updateBrokerValue(config.getMaxUnackedMessagesPerConsumer());
+        topicPolicies.getRetentionPolicies().updateBrokerValue(
+                new RetentionPolicies(config.getDefaultRetentionTimeInMinutes(), config.getDefaultRetentionSizeInMB()));
+        topicPolicies.getDeduplicationSnapshotIntervalSeconds()
+                .updateBrokerValue(config.getBrokerDeduplicationSnapshotIntervalSeconds());
+        topicPolicies.getMaxUnackedMessagesOnConsumer().updateBrokerValue(config.getMaxUnackedMessagesPerConsumer());
         topicPolicies.getMaxUnackedMessagesOnSubscription()
                 .updateBrokerValue(config.getMaxUnackedMessagesPerSubscription());
         //init backlogQuota
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerInternalClientConfigurationOverrideTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerInternalClientConfigurationOverrideTest.java
index 1b1b383e930..f33202c3c40 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerInternalClientConfigurationOverrideTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerInternalClientConfigurationOverrideTest.java
@@ -18,17 +18,21 @@
  */
 package org.apache.pulsar.broker.service;
 
+import static org.testng.Assert.assertEquals;
 import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.client.admin.internal.PulsarAdminImpl;
+import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.ClusterDataImpl;
+import org.apache.pulsar.common.policies.data.Policies;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
-
+import lombok.Cleanup;
 import java.util.Optional;
 import java.util.Properties;
 
@@ -112,4 +116,40 @@ public class BrokerInternalClientConfigurationOverrideTest extends BrokerTestBas
         Assert.assertEquals(clientConf.getMemoryLimitBytes(), 100000);
     }
 
+    @Test
+    public void testOldNamespacePolicy() throws Exception {
+        
+        String ns = "prop/oldNsWithDefaultNonNullValues";
+        String topic = "persistent://" + ns + "/t1";
+        Policies policies = new Policies();
+        policies.max_consumers_per_subscription = -1;
+        policies.max_consumers_per_topic = -1;
+        policies.max_producers_per_topic = -1;
+        policies.max_subscriptions_per_topic = -1;
+        policies.max_topics_per_namespace = -1;
+        policies.max_unacked_messages_per_consumer = -1;
+        policies.max_unacked_messages_per_subscription = -1;
+        admin.namespaces().createNamespace(ns, policies);
+        
+        @Cleanup
+        Producer<byte[]> producer = pulsarClient.newProducer()
+                .topic(topic).create();
+        PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topic).get();
+        assertEquals(topicRef.topicPolicies.getMaxUnackedMessagesOnSubscription().get(),
+                conf.getMaxUnackedMessagesPerSubscription());
+        assertEquals(topicRef.topicPolicies.getMaxConsumersPerSubscription().get(),
+                conf.getMaxConsumersPerSubscription());
+        assertEquals(topicRef.topicPolicies.getMaxConsumerPerTopic().get(),
+                conf.getMaxConsumersPerTopic());
+        assertEquals(topicRef.topicPolicies.getMaxProducersPerTopic().get(),
+                conf.getMaxProducersPerTopic());
+        assertEquals(topicRef.topicPolicies.getMaxSubscriptionsPerTopic().get(),
+                conf.getMaxSubscriptionsPerTopic());
+        assertEquals(topicRef.topicPolicies.getTopicMaxMessageSize().get(),
+                conf.getMaxMessageSize());
+        assertEquals(topicRef.topicPolicies.getMaxUnackedMessagesOnConsumer().get(),
+                conf.getMaxUnackedMessagesPerConsumer());
+        
+        
+    }
 }