You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by xy...@apache.org on 2022/07/28 08:39:37 UTC
[pulsar] 07/09: [fix][broker] Fix NPE when set `AutoTopicCreationOverride` (#15653)
This is an automated email from the ASF dual-hosted git repository.
xyz pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 42704c6c58833b4dd86820421f62f84e71aa92c3
Author: Qiang Zhao <ma...@gmail.com>
AuthorDate: Fri May 20 15:21:47 2022 +0800
[fix][broker] Fix NPE when set `AutoTopicCreationOverride` (#15653)
(cherry picked from commit e2afcf0a7ee90908fa647656624aacb9fd93249c)
---
.../pulsar/broker/admin/impl/NamespacesBase.java | 9 +++--
.../apache/pulsar/broker/admin/AdminApi2Test.java | 44 ++++++++++++++++++++++
2 files changed, 50 insertions(+), 3 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index 7c581b2cf83..0b0d02b7a6b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -99,6 +99,7 @@ import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
import org.apache.pulsar.common.policies.data.SubscribeRate;
import org.apache.pulsar.common.policies.data.SubscriptionAuthMode;
import org.apache.pulsar.common.policies.data.TenantOperation;
+import org.apache.pulsar.common.policies.data.TopicType;
import org.apache.pulsar.common.policies.data.ValidateResult;
import org.apache.pulsar.common.policies.data.impl.AutoTopicCreationOverrideImpl;
import org.apache.pulsar.common.policies.data.impl.DispatchRateImpl;
@@ -832,9 +833,11 @@ public abstract class NamespacesBase extends AdminResource {
"Invalid configuration for autoTopicCreationOverride. the detail is "
+ validateResult.getErrorInfo());
}
- if (maxPartitions > 0 && autoTopicCreationOverride.getDefaultNumPartitions() > maxPartitions) {
- throw new RestException(Status.NOT_ACCEPTABLE,
- "Number of partitions should be less than or equal to " + maxPartitions);
+ if (Objects.equals(autoTopicCreationOverride.getTopicType(), TopicType.PARTITIONED.toString())) {
+ if (maxPartitions > 0 && autoTopicCreationOverride.getDefaultNumPartitions() > maxPartitions) {
+ throw new RestException(Status.NOT_ACCEPTABLE,
+ "Number of partitions should be less than or equal to " + maxPartitions);
+ }
}
}
// Force to read the data s.t. the watch to the cache content is setup.
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
index 445dec0b633..ede133ef92f 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
@@ -46,6 +46,7 @@ import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
+import javax.ws.rs.NotAcceptableException;
import javax.ws.rs.core.Response.Status;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
@@ -81,6 +82,7 @@ import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.AutoFailoverPolicyData;
import org.apache.pulsar.common.policies.data.AutoFailoverPolicyType;
+import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.BrokerNamespaceIsolationData;
import org.apache.pulsar.common.policies.data.BrokerNamespaceIsolationDataImpl;
@@ -1614,6 +1616,48 @@ public class AdminApi2Test extends MockedPulsarServiceBaseTest {
}
}
+ @Test
+ public void testAutoTopicCreationOverrideWithMaxNumPartitionsLimit() throws Exception{
+ super.internalCleanup();
+ conf.setMaxNumPartitionsPerPartitionedTopic(10);
+ super.internalSetup();
+ admin.clusters().createCluster("test", ClusterData.builder().serviceUrl(brokerUrl.toString()).build());
+ TenantInfoImpl tenantInfo = new TenantInfoImpl(
+ Sets.newHashSet("role1", "role2"), Sets.newHashSet("test"));
+ admin.tenants().createTenant("testTenant", tenantInfo);
+ // test non-partitioned
+ admin.namespaces().createNamespace("testTenant/ns1", Sets.newHashSet("test"));
+ AutoTopicCreationOverride overridePolicy = AutoTopicCreationOverride
+ .builder().allowAutoTopicCreation(true)
+ .topicType("non-partitioned")
+ .build();
+ admin.namespaces().setAutoTopicCreation("testTenant/ns1", overridePolicy);
+ AutoTopicCreationOverride newOverridePolicy =
+ admin.namespaces().getAutoTopicCreation("testTenant/ns1");
+ assertEquals(overridePolicy, newOverridePolicy);
+ // test partitioned
+ AutoTopicCreationOverride partitionedOverridePolicy = AutoTopicCreationOverride
+ .builder().allowAutoTopicCreation(true)
+ .topicType("partitioned")
+ .defaultNumPartitions(10)
+ .build();
+ admin.namespaces().setAutoTopicCreation("testTenant/ns1", partitionedOverridePolicy);
+ AutoTopicCreationOverride partitionedNewOverridePolicy =
+ admin.namespaces().getAutoTopicCreation("testTenant/ns1");
+ assertEquals(partitionedOverridePolicy, partitionedNewOverridePolicy);
+ // test partitioned with error
+ AutoTopicCreationOverride partitionedWrongOverridePolicy = AutoTopicCreationOverride
+ .builder().allowAutoTopicCreation(true)
+ .topicType("partitioned")
+ .defaultNumPartitions(123)
+ .build();
+ try {
+ admin.namespaces().setAutoTopicCreation("testTenant/ns1", partitionedWrongOverridePolicy);
+ fail();
+ } catch (Exception ex) {
+ assertTrue(ex.getCause() instanceof NotAcceptableException);
+ }
+ }
@Test
public void testMaxTopicsPerNamespace() throws Exception {
super.internalCleanup();