You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/12/20 16:30:16 UTC
[pulsar] 05/19: [pulsar-admin] Add corresponding get command for namespace (#12322)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit bb868772d0c11c1c37822d071a9279543246d9c7
Author: Ruguo Yu <ji...@163.com>
AuthorDate: Mon Nov 8 19:13:02 2021 +0800
[pulsar-admin] Add corresponding get command for namespace (#12322)
### Motivation
CLI `./bin/pulsar-admin` provides many `set` commands for namespace and also provides corresponding `get` commands, but I found that the following `set` commands lack corresponding `get` commands.
- [grant-subscription-permission](https://pulsar.apache.org/tools/pulsar-admin/2.9.0-SNAPSHOT/#-em-grant-subscription-permission-em-)
- [set-auto-topic-creation](https://pulsar.apache.org/tools/pulsar-admin/2.9.0-SNAPSHOT/#-em-set-auto-topic-creation-em-)
- [set-auto-subscription-creation](https://pulsar.apache.org/tools/pulsar-admin/2.9.0-SNAPSHOT/#-em-set-auto-subscription-creation-em-)
- [set-encryption-required](https://pulsar.apache.org/tools/pulsar-admin/2.9.0-SNAPSHOT/#-em-set-encryption-required-em-)
- [set-subscription-auth-mode](https://pulsar.apache.org/tools/pulsar-admin/2.9.0-SNAPSHOT/#-em-set-subscription-auth-mode-em-)
The purpose of this PR is to supplement these `get` commands, as follow:
- subscription-permission
- get-auto-topic-creation
- get-auto-subscription-creation
- get-encryption-required
- get-subscription-auth-mode
(cherry picked from commit d055ebd1186ffae2d2af1382e05b21c60e9f9574)
---
.../pulsar/broker/admin/impl/NamespacesBase.java | 24 +++
.../apache/pulsar/broker/admin/v1/Namespaces.java | 65 ++++++++
.../apache/pulsar/broker/admin/v2/Namespaces.java | 59 +++++++
.../pulsar/broker/auth/AuthorizationTest.java | 6 +
.../BrokerServiceAutoSubscriptionCreationTest.java | 19 ++-
.../BrokerServiceAutoTopicCreationTest.java | 37 +++--
.../api/AuthorizationProducerConsumerTest.java | 7 +
.../org/apache/pulsar/client/admin/Namespaces.java | 81 ++++++++++
.../client/admin/internal/NamespacesImpl.java | 170 +++++++++++++++++++++
.../pulsar/admin/cli/PulsarAdminToolTest.java | 6 +
.../org/apache/pulsar/admin/cli/CmdNamespaces.java | 65 ++++++++
11 files changed, 518 insertions(+), 21 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index f2327e2..dd02f67 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -830,6 +830,12 @@ public abstract class NamespacesBase extends AdminResource {
});
}
+ protected AutoTopicCreationOverride internalGetAutoTopicCreation() {
+ validateNamespacePolicyOperation(namespaceName, PolicyName.AUTO_TOPIC_CREATION, PolicyOperation.READ);
+ Policies policies = getNamespacePolicies(namespaceName);
+ return policies.autoTopicCreationOverride;
+ }
+
protected void internalSetAutoTopicCreation(AsyncResponse asyncResponse,
AutoTopicCreationOverride autoTopicCreationOverride) {
final int maxPartitions = pulsar().getConfig().getMaxNumPartitionsPerPartitionedTopic();
@@ -902,6 +908,12 @@ public abstract class NamespacesBase extends AdminResource {
});
}
+ protected AutoSubscriptionCreationOverride internalGetAutoSubscriptionCreation() {
+ validateNamespacePolicyOperation(namespaceName, PolicyName.AUTO_SUBSCRIPTION_CREATION, PolicyOperation.READ);
+ Policies policies = getNamespacePolicies(namespaceName);
+ return policies.autoSubscriptionCreationOverride;
+ }
+
protected void internalRemoveAutoSubscriptionCreation(AsyncResponse asyncResponse) {
internalSetAutoSubscriptionCreation(asyncResponse, null);
}
@@ -1727,6 +1739,12 @@ public abstract class NamespacesBase extends AdminResource {
}
}
+ protected SubscriptionAuthMode internalGetSubscriptionAuthMode() {
+ validateNamespacePolicyOperation(namespaceName, PolicyName.SUBSCRIPTION_AUTH_MODE, PolicyOperation.READ);
+ Policies policies = getNamespacePolicies(namespaceName);
+ return policies.subscription_auth_mode;
+ }
+
protected void internalModifyEncryptionRequired(boolean encryptionRequired) {
validateNamespacePolicyOperation(namespaceName, PolicyName.ENCRYPTION, PolicyOperation.WRITE);
validatePoliciesReadOnlyAccess();
@@ -1745,6 +1763,12 @@ public abstract class NamespacesBase extends AdminResource {
}
}
+ protected Boolean internalGetEncryptionRequired() {
+ validateNamespacePolicyOperation(namespaceName, PolicyName.ENCRYPTION, PolicyOperation.READ);
+ Policies policies = getNamespacePolicies(namespaceName);
+ return policies.encryption_required;
+ }
+
protected DelayedDeliveryPolicies internalGetDelayedDelivery() {
validateNamespacePolicyOperation(namespaceName, PolicyName.DELAYED_DELIVERY, PolicyOperation.READ);
return getNamespacePolicies(namespaceName).delayed_delivery_policies;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
index dac7e44..51728d6 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
@@ -244,6 +244,22 @@ public class Namespaces extends NamespacesBase {
return policies.auth_policies.getNamespaceAuthentication();
}
+ @GET
+ @Path("/{property}/{cluster}/{namespace}/permissions/subscription")
+ @ApiOperation(value = "Retrieve the permissions for a subscription.")
+ @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
+ @ApiResponse(code = 404, message = "Property or cluster or namespace doesn't exist"),
+ @ApiResponse(code = 409, message = "Namespace is not empty")})
+ public Map<String, Set<String>> getPermissionOnSubscription(@PathParam("property") String property,
+ @PathParam("cluster") String cluster,
+ @PathParam("namespace") String namespace) {
+ validateNamespaceName(property, cluster, namespace);
+ validateNamespaceOperation(NamespaceName.get(property, namespace), NamespaceOperation.GET_PERMISSION);
+
+ Policies policies = getNamespacePolicies(namespaceName);
+ return policies.auth_policies.getSubscriptionAuthentication();
+ }
+
@POST
@Path("/{property}/{cluster}/{namespace}/permissions/{role}")
@ApiOperation(hidden = true, value = "Grant a new permission to a role on a namespace.")
@@ -460,6 +476,18 @@ public class Namespaces extends NamespacesBase {
internalModifyDeduplication(enableDeduplication);
}
+ @GET
+ @Path("/{property}/{cluster}/{namespace}/autoTopicCreation")
+ @ApiOperation(value = "Get autoTopicCreation info in a namespace")
+ @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
+ @ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist")})
+ public AutoTopicCreationOverride getAutoTopicCreation(@PathParam("property") String property,
+ @PathParam("cluster") String cluster,
+ @PathParam("namespace") String namespace) {
+ validateNamespaceName(property, cluster, namespace);
+ return internalGetAutoTopicCreation();
+ }
+
@POST
@Path("/{property}/{cluster}/{namespace}/autoTopicCreation")
@ApiOperation(value = "Override broker's allowAutoTopicCreation setting for a namespace")
@@ -521,6 +549,18 @@ public class Namespaces extends NamespacesBase {
}
}
+ @GET
+ @Path("/{property}/{cluster}/{namespace}/autoSubscriptionCreation")
+ @ApiOperation(value = "Get autoSubscriptionCreation info in a namespace")
+ @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
+ @ApiResponse(code = 404, message = "Property or cluster or namespace doesn't exist")})
+ public AutoSubscriptionCreationOverride getAutoSubscriptionCreation(@PathParam("property") String property,
+ @PathParam("cluster") String cluster,
+ @PathParam("namespace") String namespace) {
+ validateNamespaceName(property, cluster, namespace);
+ return internalGetAutoSubscriptionCreation();
+ }
+
@DELETE
@Path("/{property}/{cluster}/{namespace}/autoSubscriptionCreation")
@ApiOperation(value = "Remove override of broker's allowAutoSubscriptionCreation in a namespace")
@@ -964,6 +1004,18 @@ public class Namespaces extends NamespacesBase {
internalSetSubscriptionAuthMode(subscriptionAuthMode);
}
+ @GET
+ @Path("/{property}/{cluster}/{namespace}/subscriptionAuthMode")
+ @ApiOperation(value = "Get subscription auth mode in a namespace")
+ @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
+ @ApiResponse(code = 404, message = "Property or cluster or namespace doesn't exist")})
+ public SubscriptionAuthMode getSubscriptionAuthMode(@PathParam("property") String property,
+ @PathParam("cluster") String cluster,
+ @PathParam("namespace") String namespace) {
+ validateNamespaceName(property, cluster, namespace);
+ return internalGetSubscriptionAuthMode();
+ }
+
@POST
@Path("/{property}/{cluster}/{namespace}/encryptionRequired")
@ApiOperation(hidden = true, value = "Message encryption is required or not for all topics in a namespace")
@@ -977,6 +1029,19 @@ public class Namespaces extends NamespacesBase {
}
@GET
+ @Path("/{property}/{cluster}/{namespace}/encryptionRequired")
+ @ApiOperation(value = "Get message encryption required status in a namespace")
+ @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
+ @ApiResponse(code = 404, message = "Property or cluster or namespace doesn't exist")})
+ public Boolean getEncryptionRequired(@PathParam("property") String property,
+ @PathParam("cluster") String cluster,
+ @PathParam("namespace") String namespace) {
+ validateAdminAccessForTenant(property);
+ validateNamespaceName(property, cluster, namespace);
+ return internalGetEncryptionRequired();
+ }
+
+ @GET
@Path("/{property}/{cluster}/{namespace}/maxProducersPerTopic")
@ApiOperation(value = "Get maxProducersPerTopic config on a namespace.")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
index cf84374..e1e892c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
@@ -192,6 +192,21 @@ public class Namespaces extends NamespacesBase {
return policies.auth_policies.getNamespaceAuthentication();
}
+ @GET
+ @Path("/{tenant}/{namespace}/permissions/subscription")
+ @ApiOperation(value = "Retrieve the permissions for a subscription.")
+ @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
+ @ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist"),
+ @ApiResponse(code = 409, message = "Namespace is not empty")})
+ public Map<String, Set<String>> getPermissionOnSubscription(@PathParam("tenant") String tenant,
+ @PathParam("namespace") String namespace) {
+ validateNamespaceName(tenant, namespace);
+ validateNamespaceOperation(NamespaceName.get(tenant, namespace), NamespaceOperation.GET_PERMISSION);
+
+ Policies policies = getNamespacePolicies(namespaceName);
+ return policies.auth_policies.getSubscriptionAuthentication();
+ }
+
@POST
@Path("/{tenant}/{namespace}/permissions/{role}")
@ApiOperation(value = "Grant a new permission to a role on a namespace.")
@@ -382,6 +397,17 @@ public class Namespaces extends NamespacesBase {
internalModifyDeduplication(null);
}
+ @GET
+ @Path("/{tenant}/{namespace}/autoTopicCreation")
+ @ApiOperation(value = "Get autoTopicCreation info in a namespace")
+ @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
+ @ApiResponse(code = 404, message = "Tenant or namespace doesn't exist")})
+ public AutoTopicCreationOverride getAutoTopicCreation(@PathParam("tenant") String tenant,
+ @PathParam("namespace") String namespace) {
+ validateNamespaceName(tenant, namespace);
+ return internalGetAutoTopicCreation();
+ }
+
@POST
@Path("/{tenant}/{namespace}/autoTopicCreation")
@ApiOperation(value = "Override broker's allowAutoTopicCreation setting for a namespace")
@@ -443,6 +469,17 @@ public class Namespaces extends NamespacesBase {
}
}
+ @GET
+ @Path("/{tenant}/{namespace}/autoSubscriptionCreation")
+ @ApiOperation(value = "Get autoSubscriptionCreation info in a namespace")
+ @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
+ @ApiResponse(code = 404, message = "Tenant or namespace doesn't exist")})
+ public AutoSubscriptionCreationOverride getAutoSubscriptionCreation(@PathParam("tenant") String tenant,
+ @PathParam("namespace") String namespace) {
+ validateNamespaceName(tenant, namespace);
+ return internalGetAutoSubscriptionCreation();
+ }
+
@DELETE
@Path("/{tenant}/{namespace}/autoSubscriptionCreation")
@ApiOperation(value = "Remove override of broker's allowAutoSubscriptionCreation in a namespace")
@@ -972,6 +1009,17 @@ public class Namespaces extends NamespacesBase {
internalSetSubscriptionAuthMode(subscriptionAuthMode);
}
+ @GET
+ @Path("/{tenant}/{namespace}/subscriptionAuthMode")
+ @ApiOperation(value = "Get subscription auth mode in a namespace")
+ @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
+ @ApiResponse(code = 404, message = "Tenant or namespace doesn't exist")})
+ public SubscriptionAuthMode getSubscriptionAuthMode(@PathParam("tenant") String tenant,
+ @PathParam("namespace") String namespace) {
+ validateNamespaceName(tenant, namespace);
+ return internalGetSubscriptionAuthMode();
+ }
+
@POST
@Path("/{tenant}/{namespace}/encryptionRequired")
@ApiOperation(value = "Message encryption is required or not for all topics in a namespace")
@@ -988,6 +1036,17 @@ public class Namespaces extends NamespacesBase {
}
@GET
+ @Path("/{tenant}/{namespace}/encryptionRequired")
+ @ApiOperation(value = "Get message encryption required status in a namespace")
+ @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
+ @ApiResponse(code = 404, message = "Tenant or namespace doesn't exist")})
+ public Boolean getEncryptionRequired(@PathParam("tenant") String tenant,
+ @PathParam("namespace") String namespace) {
+ validateNamespaceName(tenant, namespace);
+ return internalGetEncryptionRequired();
+ }
+
+ @GET
@Path("/{tenant}/{namespace}/delayedDelivery")
@ApiOperation(value = "Get delayed delivery messages config on a namespace.")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthorizationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthorizationTest.java
index dcdc602..b7a54d1 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthorizationTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthorizationTest.java
@@ -32,6 +32,7 @@ import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.ClusterDataImpl;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.policies.data.SubscriptionAuthMode;
+import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
@@ -201,7 +202,12 @@ public class AuthorizationTest extends MockedPulsarServiceBaseTest {
// tests for subscription auth mode
admin.namespaces().grantPermissionOnNamespace("p1/c1/ns1", "*", EnumSet.of(AuthAction.consume));
+ admin.namespaces().setSubscriptionAuthMode("p1/c1/ns1", SubscriptionAuthMode.None);
+ Assert.assertEquals(admin.namespaces().getSubscriptionAuthMode("p1/c1/ns1"),
+ SubscriptionAuthMode.None);
admin.namespaces().setSubscriptionAuthMode("p1/c1/ns1", SubscriptionAuthMode.Prefix);
+ Assert.assertEquals(admin.namespaces().getSubscriptionAuthMode("p1/c1/ns1"),
+ SubscriptionAuthMode.Prefix);
waitForChange();
assertTrue(auth.canLookup(TopicName.get("persistent://p1/c1/ns1/ds1"), "role1", null));
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoSubscriptionCreationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoSubscriptionCreationTest.java
index c635968..d0eb3bf 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoSubscriptionCreationTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoSubscriptionCreationTest.java
@@ -26,6 +26,7 @@ import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.AutoSubscriptionCreationOverride;
+import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeClass;
@@ -98,10 +99,13 @@ public class BrokerServiceAutoSubscriptionCreationTest extends BrokerTestBase {
admin.topics().createNonPartitionedTopic(topicName.toString());
pulsar.getConfiguration().setAllowAutoSubscriptionCreation(false);
+ AutoSubscriptionCreationOverride autoSubscriptionCreationOverride = AutoSubscriptionCreationOverride.builder()
+ .allowAutoSubscriptionCreation(true)
+ .build();
pulsar.getAdminClient().namespaces().setAutoSubscriptionCreation(topicName.getNamespace(),
- AutoSubscriptionCreationOverride.builder()
- .allowAutoSubscriptionCreation(true)
- .build());
+ autoSubscriptionCreationOverride);
+ Assert.assertEquals(pulsar.getAdminClient().namespaces().getAutoSubscriptionCreation(topicName.getNamespace()),
+ autoSubscriptionCreationOverride);
// Subscribe operation should be successful
pulsarClient.newConsumer().topic(topicName.toString()).subscriptionName(subscriptionName).subscribe();
@@ -117,10 +121,13 @@ public class BrokerServiceAutoSubscriptionCreationTest extends BrokerTestBase {
admin.topics().createNonPartitionedTopic(topicName.toString());
pulsar.getConfiguration().setAllowAutoSubscriptionCreation(true);
+ AutoSubscriptionCreationOverride autoSubscriptionCreationOverride = AutoSubscriptionCreationOverride.builder()
+ .allowAutoSubscriptionCreation(false)
+ .build();
pulsar.getAdminClient().namespaces().setAutoSubscriptionCreation(topicName.getNamespace(),
- AutoSubscriptionCreationOverride.builder()
- .allowAutoSubscriptionCreation(false)
- .build());
+ autoSubscriptionCreationOverride);
+ Assert.assertEquals(pulsar.getAdminClient().namespaces().getAutoSubscriptionCreation(topicName.getNamespace()),
+ autoSubscriptionCreationOverride);
try {
pulsarClient.newConsumer().topic(topicName.toString()).subscriptionName(subscriptionName).subscribe();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoTopicCreationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoTopicCreationTest.java
index 33ed356..1b3ca16 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoTopicCreationTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoTopicCreationTest.java
@@ -29,6 +29,7 @@ import org.apache.pulsar.common.events.EventsTopicNames;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
import org.apache.pulsar.common.policies.data.TopicType;
+import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeClass;
@@ -168,11 +169,13 @@ public class BrokerServiceAutoTopicCreationTest extends BrokerTestBase{
final String subscriptionName = "test-topic-sub-4";
final TopicName topicName = TopicName.get(topicString);
pulsar.getConfiguration().setAllowAutoTopicCreation(false);
- pulsar.getAdminClient().namespaces().setAutoTopicCreation(topicName.getNamespace(),
- AutoTopicCreationOverride.builder()
- .allowAutoTopicCreation(true)
- .topicType(TopicType.NON_PARTITIONED.toString())
- .build());
+ AutoTopicCreationOverride autoTopicCreationOverride = AutoTopicCreationOverride.builder()
+ .allowAutoTopicCreation(true)
+ .topicType(TopicType.NON_PARTITIONED.toString())
+ .build();
+ pulsar.getAdminClient().namespaces().setAutoTopicCreation(topicName.getNamespace(), autoTopicCreationOverride);
+ Assert.assertEquals(pulsar.getAdminClient().namespaces().getAutoTopicCreation(topicName.getNamespace()),
+ autoTopicCreationOverride);
pulsarClient.newConsumer().topic(topicString).subscriptionName(subscriptionName).subscribe();
assertTrue(admin.namespaces().getTopics("prop/ns-abc").contains(topicString));
@@ -185,10 +188,12 @@ public class BrokerServiceAutoTopicCreationTest extends BrokerTestBase{
final String subscriptionName = "test-topic-sub-5";
final TopicName topicName = TopicName.get(topicString);
pulsar.getConfiguration().setAllowAutoTopicCreation(true);
- pulsar.getAdminClient().namespaces().setAutoTopicCreation(topicName.getNamespace(),
- AutoTopicCreationOverride.builder()
- .allowAutoTopicCreation(false)
- .build());
+ AutoTopicCreationOverride autoTopicCreationOverride = AutoTopicCreationOverride.builder()
+ .allowAutoTopicCreation(false)
+ .build();
+ pulsar.getAdminClient().namespaces().setAutoTopicCreation(topicName.getNamespace(), autoTopicCreationOverride);
+ Assert.assertEquals(pulsar.getAdminClient().namespaces().getAutoTopicCreation(topicName.getNamespace()),
+ autoTopicCreationOverride);
try {
pulsarClient.newConsumer().topic(topicString).subscriptionName(subscriptionName).subscribe();
@@ -205,12 +210,14 @@ public class BrokerServiceAutoTopicCreationTest extends BrokerTestBase{
final TopicName topicName = TopicName.get(topicString);
pulsar.getConfiguration().setAllowAutoTopicCreation(false);
- pulsar.getAdminClient().namespaces().setAutoTopicCreation(topicName.getNamespace(),
- AutoTopicCreationOverride.builder()
- .allowAutoTopicCreation(true)
- .topicType(TopicType.PARTITIONED.toString())
- .defaultNumPartitions(4)
- .build());
+ AutoTopicCreationOverride autoTopicCreationOverride = AutoTopicCreationOverride.builder()
+ .allowAutoTopicCreation(true)
+ .topicType(TopicType.PARTITIONED.toString())
+ .defaultNumPartitions(4)
+ .build();
+ pulsar.getAdminClient().namespaces().setAutoTopicCreation(topicName.getNamespace(), autoTopicCreationOverride);
+ Assert.assertEquals(pulsar.getAdminClient().namespaces().getAutoTopicCreation(topicName.getNamespace()),
+ autoTopicCreationOverride);
final String subscriptionName = "test-topic-sub-6";
pulsarClient.newConsumer().topic(topicString).subscriptionName(subscriptionName).subscribe();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
index 65f57ad..5eeac53 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
@@ -33,6 +33,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
@@ -280,6 +281,9 @@ public class AuthorizationProducerConsumerTest extends ProducerConsumerBase {
String otherPrincipal = "Principal-1-to-access-sub";
tenantAdmin.namespaces().grantPermissionOnSubscription(namespace, subscriptionName,
Collections.singleton(otherPrincipal));
+ TreeMap<String, Set<String>> permissionOnSubscription = new TreeMap<>();
+ permissionOnSubscription.put(subscriptionName, Collections.singleton(otherPrincipal));
+ Assert.assertEquals(tenantAdmin.namespaces().getPermissionOnSubscription(namespace), permissionOnSubscription);
// now, subscriptionRole doesn't have subscription level access so, it will fail to access subscription
try {
@@ -300,6 +304,9 @@ public class AuthorizationProducerConsumerTest extends ProducerConsumerBase {
// now, grant subscription-access to subscriptionRole as well
superAdmin.namespaces().grantPermissionOnSubscription(namespace, subscriptionName,
Sets.newHashSet(otherPrincipal, subscriptionRole));
+ TreeMap<String, Set<String>> permissionOnSubscription1 = new TreeMap<>();
+ permissionOnSubscription1.put(subscriptionName, Sets.newHashSet(otherPrincipal, subscriptionRole));
+ Assert.assertEquals(tenantAdmin.namespaces().getPermissionOnSubscription(namespace), permissionOnSubscription1);
sub1Admin.topics().skipAllMessages(topicName, subscriptionName);
sub1Admin.topics().skipMessages(topicName, subscriptionName, 1);
diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
index 6ea4bab..fb1b67e 100644
--- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
+++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
@@ -688,6 +688,19 @@ public interface Namespaces {
CompletableFuture<Void> revokePermissionsOnNamespaceAsync(String namespace, String role);
/**
+ * Get permission to role to access subscription's admin-api.
+ * @param namespace
+ * @throws PulsarAdminException
+ */
+ Map<String, Set<String>> getPermissionOnSubscription(String namespace) throws PulsarAdminException;
+
+ /**
+ * Get permission to role to access subscription's admin-api asynchronously.
+ * @param namespace
+ */
+ CompletableFuture<Map<String, Set<String>>> getPermissionOnSubscriptionAsync(String namespace);
+
+ /**
* Grant permission to role to access subscription's admin-api.
* @param namespace
* @param subscription
@@ -1238,6 +1251,23 @@ public interface Namespaces {
String namespace, AutoTopicCreationOverride autoTopicCreationOverride);
/**
+ * Get the autoTopicCreation info within a namespace.
+ *
+ * @param namespace
+ * @return
+ * @throws PulsarAdminException
+ */
+ AutoTopicCreationOverride getAutoTopicCreation(String namespace) throws PulsarAdminException;
+
+ /**
+ * Get the autoTopicCreation info within a namespace asynchronously.
+ *
+ * @param namespace
+ * @return
+ */
+ CompletableFuture<AutoTopicCreationOverride> getAutoTopicCreationAsync(String namespace);
+
+ /**
* Removes the autoTopicCreation policy for a given namespace.
* <p/>
* Allowing the broker to dictate the auto-creation policy.
@@ -1323,6 +1353,23 @@ public interface Namespaces {
String namespace, AutoSubscriptionCreationOverride autoSubscriptionCreationOverride);
/**
+ * Get the autoSubscriptionCreation info within a namespace.
+ *
+ * @param namespace
+ * @return
+ * @throws PulsarAdminException
+ */
+ AutoSubscriptionCreationOverride getAutoSubscriptionCreation(String namespace) throws PulsarAdminException;
+
+ /**
+ * Get the autoSubscriptionCreation info within a namespace asynchronously.
+ *
+ * @param namespace
+ * @return
+ */
+ CompletableFuture<AutoSubscriptionCreationOverride> getAutoSubscriptionCreationAsync(String namespace);
+
+ /**
* Sets the subscriptionTypesEnabled policy for a given namespace, overriding broker settings.
*
* Request example:
@@ -2439,6 +2486,23 @@ public interface Namespaces {
void setEncryptionRequiredStatus(String namespace, boolean encryptionRequired) throws PulsarAdminException;
/**
+ * Get the encryption required status within a namespace.
+ *
+ * @param namespace
+ * @return
+ * @throws PulsarAdminException
+ */
+ Boolean getEncryptionRequiredStatus(String namespace) throws PulsarAdminException;
+
+ /**
+ * Get the encryption required status within a namespace asynchronously.
+ *
+ * @param namespace
+ * @return
+ */
+ CompletableFuture<Boolean> getEncryptionRequiredStatusAsync(String namespace);
+
+ /**
* Set the encryption required status for all topics within a namespace asynchronously.
* <p/>
* When encryption required is true, the broker will prevent to store unencrypted messages.
@@ -2647,6 +2711,23 @@ public interface Namespaces {
CompletableFuture<Void> setSubscriptionAuthModeAsync(String namespace, SubscriptionAuthMode subscriptionAuthMode);
/**
+ * Get the subscriptionAuthMode within a namespace.
+ *
+ * @param namespace
+ * @return
+ * @throws PulsarAdminException
+ */
+ SubscriptionAuthMode getSubscriptionAuthMode(String namespace) throws PulsarAdminException;
+
+ /**
+ * Get the subscriptionAuthMode within a namespace asynchronously.
+ *
+ * @param namespace
+ * @return
+ */
+ CompletableFuture<SubscriptionAuthMode> getSubscriptionAuthModeAsync(String namespace);
+
+ /**
* Get the deduplicationSnapshotInterval for a namespace.
*
* @param namespace
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
index 610423b..04da2f0 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
@@ -513,6 +513,39 @@ public class NamespacesImpl extends BaseResource implements Namespaces {
return asyncDeleteRequest(path);
}
+ @Override
+ public Map<String, Set<String>> getPermissionOnSubscription(String namespace) throws PulsarAdminException {
+ try {
+ return getPermissionOnSubscriptionAsync(namespace).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
+ }
+ }
+
+ @Override
+ public CompletableFuture<Map<String, Set<String>>> getPermissionOnSubscriptionAsync(String namespace) {
+ NamespaceName ns = NamespaceName.get(namespace);
+ WebTarget path = namespacePath(ns, "permissions", "subscription");
+ final CompletableFuture<Map<String, Set<String>>> future = new CompletableFuture<>();
+ asyncGetRequest(path,
+ new InvocationCallback<Map<String, Set<String>>>() {
+ @Override
+ public void completed(Map<String, Set<String>> permissions) {
+ future.complete(permissions);
+ }
+
+ @Override
+ public void failed(Throwable throwable) {
+ future.completeExceptionally(getApiException(throwable.getCause()));
+ }
+ });
+ return future;
+ }
@Override
public void grantPermissionOnSubscription(String namespace, String subscription, Set<String> roles)
@@ -996,6 +1029,40 @@ public class NamespacesImpl extends BaseResource implements Namespaces {
}
@Override
+ public AutoTopicCreationOverride getAutoTopicCreation(String namespace) throws PulsarAdminException {
+ try {
+ return getAutoTopicCreationAsync(namespace).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
+ }
+ }
+
+ @Override
+ public CompletableFuture<AutoTopicCreationOverride> getAutoTopicCreationAsync(String namespace) {
+ NamespaceName ns = NamespaceName.get(namespace);
+ WebTarget path = namespacePath(ns, "autoTopicCreation");
+ final CompletableFuture<AutoTopicCreationOverride> future = new CompletableFuture<>();
+ asyncGetRequest(path,
+ new InvocationCallback<AutoTopicCreationOverride>() {
+ @Override
+ public void completed(AutoTopicCreationOverride autoTopicCreationOverride) {
+ future.complete(autoTopicCreationOverride);
+ }
+
+ @Override
+ public void failed(Throwable throwable) {
+ future.completeExceptionally(getApiException(throwable.getCause()));
+ }
+ });
+ return future;
+ }
+
+ @Override
public void removeAutoTopicCreation(String namespace) throws PulsarAdminException {
try {
removeAutoTopicCreationAsync(namespace).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
@@ -1041,6 +1108,41 @@ public class NamespacesImpl extends BaseResource implements Namespaces {
}
@Override
+ public AutoSubscriptionCreationOverride getAutoSubscriptionCreation(String namespace) throws PulsarAdminException {
+ try {
+ return getAutoSubscriptionCreationAsync(namespace).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
+ }
+ }
+
+ @Override
+ public CompletableFuture<AutoSubscriptionCreationOverride> getAutoSubscriptionCreationAsync(String namespace) {
+ NamespaceName ns = NamespaceName.get(namespace);
+ WebTarget path = namespacePath(ns, "autoSubscriptionCreation");
+ final CompletableFuture<AutoSubscriptionCreationOverride> future = new CompletableFuture<>();
+ asyncGetRequest(path,
+ new InvocationCallback<AutoSubscriptionCreationOverride>() {
+ @Override
+ public void completed(AutoSubscriptionCreationOverride autoSubscriptionCreation) {
+ future.complete(autoSubscriptionCreation);
+ }
+
+ @Override
+ public void failed(Throwable throwable) {
+ future.completeExceptionally(getApiException(throwable.getCause()));
+ }
+ });
+ return future;
+ }
+
+
+ @Override
public void setSubscriptionTypesEnabled(
String namespace, Set<SubscriptionType> subscriptionTypesEnabled) throws PulsarAdminException {
try {
@@ -2106,6 +2208,40 @@ public class NamespacesImpl extends BaseResource implements Namespaces {
}
@Override
+ public SubscriptionAuthMode getSubscriptionAuthMode(String namespace) throws PulsarAdminException {
+ try {
+ return getSubscriptionAuthModeAsync(namespace).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
+ }
+ }
+
+ @Override
+ public CompletableFuture<SubscriptionAuthMode> getSubscriptionAuthModeAsync(String namespace) {
+ NamespaceName ns = NamespaceName.get(namespace);
+ WebTarget path = namespacePath(ns, "subscriptionAuthMode");
+ final CompletableFuture<SubscriptionAuthMode> future = new CompletableFuture<>();
+ asyncGetRequest(path,
+ new InvocationCallback<SubscriptionAuthMode>() {
+ @Override
+ public void completed(SubscriptionAuthMode subscriptionAuthMode) {
+ future.complete(subscriptionAuthMode);
+ }
+
+ @Override
+ public void failed(Throwable throwable) {
+ future.completeExceptionally(getApiException(throwable.getCause()));
+ }
+ });
+ return future;
+ }
+
+ @Override
public void setEncryptionRequiredStatus(String namespace, boolean encryptionRequired) throws PulsarAdminException {
try {
setEncryptionRequiredStatusAsync(namespace, encryptionRequired)
@@ -2128,6 +2264,40 @@ public class NamespacesImpl extends BaseResource implements Namespaces {
}
@Override
+ public Boolean getEncryptionRequiredStatus(String namespace) throws PulsarAdminException {
+ try {
+ return getEncryptionRequiredStatusAsync(namespace).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
+ }
+ }
+
+ @Override
+ public CompletableFuture<Boolean> getEncryptionRequiredStatusAsync(String namespace) {
+ NamespaceName ns = NamespaceName.get(namespace);
+ WebTarget path = namespacePath(ns, "encryptionRequired");
+ final CompletableFuture<Boolean> future = new CompletableFuture<>();
+ asyncGetRequest(path,
+ new InvocationCallback<Boolean>() {
+ @Override
+ public void completed(Boolean enabled) {
+ future.complete(enabled);
+ }
+
+ @Override
+ public void failed(Throwable throwable) {
+ future.completeExceptionally(getApiException(throwable.getCause()));
+ }
+ });
+ return future;
+ }
+
+ @Override
public DelayedDeliveryPolicies getDelayedDelivery(String namespace) throws PulsarAdminException {
try {
return getDelayedDeliveryAsync(namespace).
diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
index 60fb6ac..5cdaba2 100644
--- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
+++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
@@ -508,6 +508,9 @@ public class PulsarAdminToolTest {
.topicType(TopicType.NON_PARTITIONED.toString())
.build());
+ namespaces.run(split("get-auto-topic-creation myprop/clust/ns1"));
+ verify(mockNamespaces).getAutoTopicCreation("myprop/clust/ns1");
+
namespaces.run(split("remove-auto-topic-creation myprop/clust/ns1"));
verify(mockNamespaces).removeAutoTopicCreation("myprop/clust/ns1");
@@ -515,6 +518,9 @@ public class PulsarAdminToolTest {
verify(mockNamespaces).setAutoSubscriptionCreation("myprop/clust/ns1",
AutoSubscriptionCreationOverride.builder().allowAutoSubscriptionCreation(true).build());
+ namespaces.run(split("get-auto-subscription-creation myprop/clust/ns1"));
+ verify(mockNamespaces).getAutoSubscriptionCreation("myprop/clust/ns1");
+
namespaces.run(split("remove-auto-subscription-creation myprop/clust/ns1"));
verify(mockNamespaces).removeAutoSubscriptionCreation("myprop/clust/ns1");
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
index f6fa123..a326f82 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
@@ -235,6 +235,18 @@ public class CmdNamespaces extends CmdBase {
}
}
+ @Parameters(commandDescription = "Get permissions to access subscription admin-api")
+ private class SubscriptionPermissions extends CliCommand {
+ @Parameter(description = "tenant/namespace", required = true)
+ private java.util.List<String> params;
+
+ @Override
+ void run() throws PulsarAdminException {
+ String namespace = validateNamespace(params);
+ print(getAdmin().namespaces().getPermissionOnSubscription(namespace));
+ }
+ }
+
@Parameters(commandDescription = "Grant permissions to access subscription admin-api")
private class GrantSubscriptionPermissions extends CliCommand {
@Parameter(description = "tenant/namespace", required = true)
@@ -588,6 +600,18 @@ public class CmdNamespaces extends CmdBase {
}
}
+ @Parameters(commandDescription = "Get autoTopicCreation info for a namespace")
+ private class GetAutoTopicCreation extends CliCommand {
+ @Parameter(description = "tenant/namespace", required = true)
+ private java.util.List<String> params;
+
+ @Override
+ void run() throws PulsarAdminException {
+ String namespace = validateNamespace(params);
+ print(getAdmin().namespaces().getAutoTopicCreation(namespace));
+ }
+ }
+
@Parameters(commandDescription = "Remove override of autoTopicCreation for a namespace")
private class RemoveAutoTopicCreation extends CliCommand {
@Parameter(description = "tenant/namespace", required = true)
@@ -619,6 +643,18 @@ public class CmdNamespaces extends CmdBase {
}
}
+ @Parameters(commandDescription = "Get the autoSubscriptionCreation for a namespace")
+ private class GetAutoSubscriptionCreation extends CliCommand {
+ @Parameter(description = "tenant/namespace", required = true)
+ private java.util.List<String> params;
+
+ @Override
+ void run() throws PulsarAdminException {
+ String namespace = validateNamespace(params);
+ print(getAdmin().namespaces().getAutoSubscriptionCreation(namespace));
+ }
+ }
+
@Parameters(commandDescription = "Remove override of autoSubscriptionCreation for a namespace")
private class RemoveAutoSubscriptionCreation extends CliCommand {
@Parameter(description = "tenant/namespace", required = true)
@@ -1274,6 +1310,18 @@ public class CmdNamespaces extends CmdBase {
}
}
+ @Parameters(commandDescription = "Get encryption required for a namespace")
+ private class GetEncryptionRequired extends CliCommand {
+ @Parameter(description = "tenant/namespace", required = true)
+ private java.util.List<String> params;
+
+ @Override
+ void run() throws PulsarAdminException {
+ String namespace = validateNamespace(params);
+ print(getAdmin().namespaces().getEncryptionRequiredStatus(namespace));
+ }
+ }
+
@Parameters(commandDescription = "Get the delayed delivery policy for a namespace")
private class GetDelayedDelivery extends CliCommand {
@Parameter(description = "tenant/namespace", required = true)
@@ -1406,6 +1454,18 @@ public class CmdNamespaces extends CmdBase {
}
}
+ @Parameters(commandDescription = "Get subscriptionAuthMod for a namespace")
+ private class GetSubscriptionAuthMode extends CliCommand {
+ @Parameter(description = "tenant/namespace", required = true)
+ private java.util.List<String> params;
+
+ @Override
+ void run() throws PulsarAdminException {
+ String namespace = validateNamespace(params);
+ print(getAdmin().namespaces().getSubscriptionAuthMode(namespace));
+ }
+ }
+
@Parameters(commandDescription = "Get deduplicationSnapshotInterval for a namespace")
private class GetDeduplicationSnapshotInterval extends CliCommand {
@Parameter(description = "tenant/namespace", required = true)
@@ -2331,6 +2391,7 @@ public class CmdNamespaces extends CmdBase {
jcommander.addCommand("grant-permission", new GrantPermissions());
jcommander.addCommand("revoke-permission", new RevokePermissions());
+ jcommander.addCommand("subscription-permission", new SubscriptionPermissions());
jcommander.addCommand("grant-subscription-permission", new GrantSubscriptionPermissions());
jcommander.addCommand("revoke-subscription-permission", new RevokeSubscriptionPermissions());
@@ -2370,9 +2431,11 @@ public class CmdNamespaces extends CmdBase {
jcommander.addCommand("remove-deduplication", new RemoveDeduplication());
jcommander.addCommand("set-auto-topic-creation", new SetAutoTopicCreation());
+ jcommander.addCommand("get-auto-topic-creation", new GetAutoTopicCreation());
jcommander.addCommand("remove-auto-topic-creation", new RemoveAutoTopicCreation());
jcommander.addCommand("set-auto-subscription-creation", new SetAutoSubscriptionCreation());
+ jcommander.addCommand("get-auto-subscription-creation", new GetAutoSubscriptionCreation());
jcommander.addCommand("remove-auto-subscription-creation", new RemoveAutoSubscriptionCreation());
jcommander.addCommand("get-retention", new GetRetention());
@@ -2412,7 +2475,9 @@ public class CmdNamespaces extends CmdBase {
jcommander.addCommand("unsubscribe", new Unsubscribe());
jcommander.addCommand("set-encryption-required", new SetEncryptionRequired());
+ jcommander.addCommand("get-encryption-required", new GetEncryptionRequired());
jcommander.addCommand("set-subscription-auth-mode", new SetSubscriptionAuthMode());
+ jcommander.addCommand("get-subscription-auth-mode", new GetSubscriptionAuthMode());
jcommander.addCommand("set-delayed-delivery", new SetDelayedDelivery());
jcommander.addCommand("get-delayed-delivery", new GetDelayedDelivery());