You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rd...@apache.org on 2024/04/19 17:31:01 UTC
(pulsar) branch master updated: [fix][broker] Fix broken topic policy implementation compatibility with old pulsar version (#22535)
This is an automated email from the ASF dual-hosted git repository.
rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 59daac64c21 [fix][broker] Fix broken topic policy implementation compatibility with old pulsar version (#22535)
59daac64c21 is described below
commit 59daac64c210f539e733f883edad09d08333aa62
Author: Rajan Dhabalia <rd...@apache.org>
AuthorDate: Fri Apr 19 10:30:55 2024 -0700
[fix][broker] Fix broken topic policy implementation compatibility with old pulsar version (#22535)
---
.../pulsar/broker/service/AbstractTopic.java | 52 +++++++++++++---------
...kerInternalClientConfigurationOverrideTest.java | 42 ++++++++++++++++-
2 files changed, 72 insertions(+), 22 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index e772486fcc6..44a4ca42cea 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -220,13 +220,16 @@ public abstract class AbstractTopic implements Topic, TopicPolicyListener<TopicP
topicPolicies.getRetentionPolicies().updateTopicValue(data.getRetentionPolicies());
topicPolicies.getDispatcherPauseOnAckStatePersistentEnabled()
.updateTopicValue(data.getDispatcherPauseOnAckStatePersistentEnabled());
- topicPolicies.getMaxSubscriptionsPerTopic().updateTopicValue(data.getMaxSubscriptionsPerTopic());
- topicPolicies.getMaxUnackedMessagesOnConsumer().updateTopicValue(data.getMaxUnackedMessagesOnConsumer());
+ topicPolicies.getMaxSubscriptionsPerTopic()
+ .updateTopicValue(normalizeValue(data.getMaxSubscriptionsPerTopic()));
+ topicPolicies.getMaxUnackedMessagesOnConsumer()
+ .updateTopicValue(normalizeValue(data.getMaxUnackedMessagesOnConsumer()));
topicPolicies.getMaxUnackedMessagesOnSubscription()
- .updateTopicValue(data.getMaxUnackedMessagesOnSubscription());
- topicPolicies.getMaxProducersPerTopic().updateTopicValue(data.getMaxProducerPerTopic());
- topicPolicies.getMaxConsumerPerTopic().updateTopicValue(data.getMaxConsumerPerTopic());
- topicPolicies.getMaxConsumersPerSubscription().updateTopicValue(data.getMaxConsumersPerSubscription());
+ .updateTopicValue(normalizeValue(data.getMaxUnackedMessagesOnSubscription()));
+ topicPolicies.getMaxProducersPerTopic().updateTopicValue(normalizeValue(data.getMaxProducerPerTopic()));
+ topicPolicies.getMaxConsumerPerTopic().updateTopicValue(normalizeValue(data.getMaxConsumerPerTopic()));
+ topicPolicies.getMaxConsumersPerSubscription()
+ .updateTopicValue(normalizeValue(data.getMaxConsumersPerSubscription()));
topicPolicies.getInactiveTopicPolicies().updateTopicValue(data.getInactiveTopicPolicies());
topicPolicies.getDeduplicationEnabled().updateTopicValue(data.getDeduplicationEnabled());
topicPolicies.getDeduplicationSnapshotIntervalSeconds().updateTopicValue(
@@ -237,8 +240,8 @@ public abstract class AbstractTopic implements Topic, TopicPolicyListener<TopicP
Arrays.stream(BacklogQuota.BacklogQuotaType.values()).forEach(type ->
this.topicPolicies.getBackLogQuotaMap().get(type).updateTopicValue(
data.getBackLogQuotaMap() == null ? null : data.getBackLogQuotaMap().get(type.toString())));
- topicPolicies.getTopicMaxMessageSize().updateTopicValue(data.getMaxMessageSize());
- topicPolicies.getMessageTTLInSeconds().updateTopicValue(data.getMessageTTLInSeconds());
+ topicPolicies.getTopicMaxMessageSize().updateTopicValue(normalizeValue(data.getMaxMessageSize()));
+ topicPolicies.getMessageTTLInSeconds().updateTopicValue(normalizeValue(data.getMessageTTLInSeconds()));
topicPolicies.getPublishRate().updateTopicValue(PublishRate.normalize(data.getPublishRate()));
topicPolicies.getDelayedDeliveryEnabled().updateTopicValue(data.getDelayedDeliveryEnabled());
topicPolicies.getReplicatorDispatchRate().updateTopicValue(
@@ -268,15 +271,19 @@ public abstract class AbstractTopic implements Topic, TopicPolicyListener<TopicP
topicPolicies.getReplicationClusters().updateNamespaceValue(
new ArrayList<>(CollectionUtils.emptyIfNull(namespacePolicies.replication_clusters)));
topicPolicies.getMaxUnackedMessagesOnConsumer()
- .updateNamespaceValue(namespacePolicies.max_unacked_messages_per_consumer);
+ .updateNamespaceValue(normalizeValue(namespacePolicies.max_unacked_messages_per_consumer));
topicPolicies.getMaxUnackedMessagesOnSubscription()
- .updateNamespaceValue(namespacePolicies.max_unacked_messages_per_subscription);
- topicPolicies.getMessageTTLInSeconds().updateNamespaceValue(namespacePolicies.message_ttl_in_seconds);
- topicPolicies.getMaxSubscriptionsPerTopic().updateNamespaceValue(namespacePolicies.max_subscriptions_per_topic);
- topicPolicies.getMaxProducersPerTopic().updateNamespaceValue(namespacePolicies.max_producers_per_topic);
- topicPolicies.getMaxConsumerPerTopic().updateNamespaceValue(namespacePolicies.max_consumers_per_topic);
+ .updateNamespaceValue(normalizeValue(namespacePolicies.max_unacked_messages_per_subscription));
+ topicPolicies.getMessageTTLInSeconds()
+ .updateNamespaceValue(normalizeValue(namespacePolicies.message_ttl_in_seconds));
+ topicPolicies.getMaxSubscriptionsPerTopic()
+ .updateNamespaceValue(normalizeValue(namespacePolicies.max_subscriptions_per_topic));
+ topicPolicies.getMaxProducersPerTopic()
+ .updateNamespaceValue(normalizeValue(namespacePolicies.max_producers_per_topic));
+ topicPolicies.getMaxConsumerPerTopic()
+ .updateNamespaceValue(normalizeValue(namespacePolicies.max_consumers_per_topic));
topicPolicies.getMaxConsumersPerSubscription()
- .updateNamespaceValue(namespacePolicies.max_consumers_per_subscription);
+ .updateNamespaceValue(normalizeValue(namespacePolicies.max_consumers_per_subscription));
topicPolicies.getInactiveTopicPolicies().updateNamespaceValue(namespacePolicies.inactive_topic_policies);
topicPolicies.getDeduplicationEnabled().updateNamespaceValue(namespacePolicies.deduplicationEnabled);
topicPolicies.getDeduplicationSnapshotIntervalSeconds().updateNamespaceValue(
@@ -312,6 +319,10 @@ public abstract class AbstractTopic implements Topic, TopicPolicyListener<TopicP
updateEntryFilters();
}
+ private Integer normalizeValue(Integer policyValue) {
+ return policyValue != null && policyValue < 0 ? null : policyValue;
+ }
+
private void updateNamespaceDispatchRate(Policies namespacePolicies, String cluster) {
DispatchRateImpl dispatchRate = namespacePolicies.topicDispatchRate.get(cluster);
if (dispatchRate == null) {
@@ -370,12 +381,11 @@ public abstract class AbstractTopic implements Topic, TopicPolicyListener<TopicP
topicPolicies.getMaxConsumerPerTopic().updateBrokerValue(config.getMaxConsumersPerTopic());
topicPolicies.getMaxConsumersPerSubscription().updateBrokerValue(config.getMaxConsumersPerSubscription());
topicPolicies.getDeduplicationEnabled().updateBrokerValue(config.isBrokerDeduplicationEnabled());
- topicPolicies.getRetentionPolicies().updateBrokerValue(new RetentionPolicies(
- config.getDefaultRetentionTimeInMinutes(), config.getDefaultRetentionSizeInMB()));
- topicPolicies.getDeduplicationSnapshotIntervalSeconds().updateBrokerValue(
- config.getBrokerDeduplicationSnapshotIntervalSeconds());
- topicPolicies.getMaxUnackedMessagesOnConsumer()
- .updateBrokerValue(config.getMaxUnackedMessagesPerConsumer());
+ topicPolicies.getRetentionPolicies().updateBrokerValue(
+ new RetentionPolicies(config.getDefaultRetentionTimeInMinutes(), config.getDefaultRetentionSizeInMB()));
+ topicPolicies.getDeduplicationSnapshotIntervalSeconds()
+ .updateBrokerValue(config.getBrokerDeduplicationSnapshotIntervalSeconds());
+ topicPolicies.getMaxUnackedMessagesOnConsumer().updateBrokerValue(config.getMaxUnackedMessagesPerConsumer());
topicPolicies.getMaxUnackedMessagesOnSubscription()
.updateBrokerValue(config.getMaxUnackedMessagesPerSubscription());
//init backlogQuota
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerInternalClientConfigurationOverrideTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerInternalClientConfigurationOverrideTest.java
index 1b1b383e930..f33202c3c40 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerInternalClientConfigurationOverrideTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerInternalClientConfigurationOverrideTest.java
@@ -18,17 +18,21 @@
*/
package org.apache.pulsar.broker.service;
+import static org.testng.Assert.assertEquals;
import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.admin.internal.PulsarAdminImpl;
+import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.ClusterDataImpl;
+import org.apache.pulsar.common.policies.data.Policies;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
-
+import lombok.Cleanup;
import java.util.Optional;
import java.util.Properties;
@@ -112,4 +116,40 @@ public class BrokerInternalClientConfigurationOverrideTest extends BrokerTestBas
Assert.assertEquals(clientConf.getMemoryLimitBytes(), 100000);
}
+ @Test
+ public void testOldNamespacePolicy() throws Exception {
+
+ String ns = "prop/oldNsWithDefaultNonNullValues";
+ String topic = "persistent://" + ns + "/t1";
+ Policies policies = new Policies();
+ policies.max_consumers_per_subscription = -1;
+ policies.max_consumers_per_topic = -1;
+ policies.max_producers_per_topic = -1;
+ policies.max_subscriptions_per_topic = -1;
+ policies.max_topics_per_namespace = -1;
+ policies.max_unacked_messages_per_consumer = -1;
+ policies.max_unacked_messages_per_subscription = -1;
+ admin.namespaces().createNamespace(ns, policies);
+
+ @Cleanup
+ Producer<byte[]> producer = pulsarClient.newProducer()
+ .topic(topic).create();
+ PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topic).get();
+ assertEquals(topicRef.topicPolicies.getMaxUnackedMessagesOnSubscription().get(),
+ conf.getMaxUnackedMessagesPerSubscription());
+ assertEquals(topicRef.topicPolicies.getMaxConsumersPerSubscription().get(),
+ conf.getMaxConsumersPerSubscription());
+ assertEquals(topicRef.topicPolicies.getMaxConsumerPerTopic().get(),
+ conf.getMaxConsumersPerTopic());
+ assertEquals(topicRef.topicPolicies.getMaxProducersPerTopic().get(),
+ conf.getMaxProducersPerTopic());
+ assertEquals(topicRef.topicPolicies.getMaxSubscriptionsPerTopic().get(),
+ conf.getMaxSubscriptionsPerTopic());
+ assertEquals(topicRef.topicPolicies.getTopicMaxMessageSize().get(),
+ conf.getMaxMessageSize());
+ assertEquals(topicRef.topicPolicies.getMaxUnackedMessagesOnConsumer().get(),
+ conf.getMaxUnackedMessagesPerConsumer());
+
+
+ }
}