You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/06/07 04:56:16 UTC

[GitHub] [pulsar] Technoboy- opened a new pull request, #15956: [modify][admin] Change the permissions of the schema API from Admin to normal produce/consume

Technoboy- opened a new pull request, #15956:
URL: https://github.com/apache/pulsar/pull/15956

   ### Motivation
   Currently, we need admin permissions to operate the schema API. This is because the admin permission was defined when the schema API was first added. See #1381.
   Later, then adding authentication granularity with #6428, we don't change the schema API part.  So leave the admin permission today.
   
   The schema should follow the permissions of the topic, so change the related method permission to `produce/consume`.
   
   ### Modifications
   - `get` need `consume` permission
   - `post` need `produce/consume` permission
   - `delete` need `produce` permission
   
   ### Documentation
   
   - [x] `doc-not-needed` 
   (Please explain why)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Technoboy- commented on a diff in pull request #15956: [modify][admin] Change the permissions of the schema API from Admin to normal produce/consume

Posted by GitBox <gi...@apache.org>.
Technoboy- commented on code in PR #15956:
URL: https://github.com/apache/pulsar/pull/15956#discussion_r890837077


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java:
##########
@@ -749,9 +747,7 @@ private CompletableFuture<Void> provisionPartitionedTopicPath(AsyncResponse asyn
     }
 
     protected CompletableFuture<SchemaCompatibilityStrategy> getSchemaCompatibilityStrategyAsync() {
-        return validateTopicPolicyOperationAsync(topicName,
-                PolicyName.SCHEMA_COMPATIBILITY_STRATEGY,
-                PolicyOperation.READ)

Review Comment:
   I have rolled back this change. I will open a new patch to solve `PolicyOperation` issue.
   Currently, only admin has permission to process PolicyOperation related method.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] eolivelli commented on pull request #15956: [modify][admin] Change the permissions of the schema API from Admin to normal produce/consume

Posted by GitBox <gi...@apache.org>.
eolivelli commented on PR #15956:
URL: https://github.com/apache/pulsar/pull/15956#issuecomment-1151047625

   This is an important security related change.
   Did I miss some discussion on dev@ ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codelipenghui commented on pull request #15956: [modify][admin] Change the permissions of the schema API from Admin to normal produce/consume

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on PR #15956:
URL: https://github.com/apache/pulsar/pull/15956#issuecomment-1151175198

   @eolivelli It should be a bug. From the binary protocol, the `produce` and `consume` permission can get schema, but the REST API can't.
   
   And, only the tenant admin can get schema is unreasonable. It looks like the PR fixed a BUG that `Have the produce/consume permission but not able to get schema`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Technoboy- commented on a diff in pull request #15956: [modify][admin] Change the permissions of the schema API from Admin to normal produce/consume

Posted by GitBox <gi...@apache.org>.
Technoboy- commented on code in PR #15956:
URL: https://github.com/apache/pulsar/pull/15956#discussion_r890826231


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SchemasResourceBase.java:
##########
@@ -163,7 +164,7 @@ public void deleteSchema(boolean authoritative, AsyncResponse response, boolean
     }
 
     public CompletableFuture<SchemaVersion> deleteSchemaAsync(boolean authoritative, boolean force) {
-        return validateDestinationAndAdminOperationAsync(authoritative)
+        return validateOwnershipAndOperationAsync(authoritative, TopicOperation.PRODUCE)

Review Comment:
   ok. rollback.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SchemasResourceBase.java:
##########
@@ -97,7 +98,7 @@ public void getSchema(boolean authoritative, AsyncResponse response) {
     }
 
     public CompletableFuture<SchemaAndMetadata> getSchemaAsync(boolean authoritative) {
-        return validateDestinationAndAdminOperationAsync(authoritative)
+        return validateOwnershipAndOperationAsync(authoritative, TopicOperation.CONSUME)

Review Comment:
   > For both PRODUCE and CONSUME, it can get schema, not only the consumer.
   
   fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] gaoran10 commented on pull request #15956: [modify][admin] Change the permissions of the schema API from Admin to normal produce/consume

Posted by GitBox <gi...@apache.org>.
gaoran10 commented on PR #15956:
URL: https://github.com/apache/pulsar/pull/15956#issuecomment-1149373906

   @nodece It seems that the `PRODUCE` and `CONSUME` should contain the `GET_SCHEMA` permission, or `PRODUCE` and `CONSUME` permission couldn't work well.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] yuruguo commented on pull request #15956: [fix][admin] Fix producer/consume permission can’t get schema

Posted by GitBox <gi...@apache.org>.
yuruguo commented on PR #15956:
URL: https://github.com/apache/pulsar/pull/15956#issuecomment-1151890927

   @Technoboy- @eolivelli @codelipenghui This pr should be related to issue[12419](https://github.com/apache/pulsar/issues/12419) and the problem to be solved is that `role` has the `lookup` topic permission should also have the operation permission of the topic schema.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] mattisonchao commented on a diff in pull request #15956: [modify][admin] Change the permissions of the schema API from Admin to normal produce/consume

Posted by GitBox <gi...@apache.org>.
mattisonchao commented on code in PR #15956:
URL: https://github.com/apache/pulsar/pull/15956#discussion_r890783653


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SchemasResourceBase.java:
##########
@@ -97,7 +98,7 @@ public void getSchema(boolean authoritative, AsyncResponse response) {
     }
 
     public CompletableFuture<SchemaAndMetadata> getSchemaAsync(boolean authoritative) {
-        return validateDestinationAndAdminOperationAsync(authoritative)
+        return validateOwnershipAndOperationAsync(authoritative, TopicOperation.CONSUME)

Review Comment:
   Could we introduce a new operation `GET_SCHEMA`? use `canProduce` and `canConsume` to verify it?
   
   The combination mode may bring more flexibility.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Technoboy- commented on pull request #15956: [fix][admin] Fix producer/consume permission can’t get schema

Posted by GitBox <gi...@apache.org>.
Technoboy- commented on PR #15956:
URL: https://github.com/apache/pulsar/pull/15956#issuecomment-1251914400

   > This PR also needs to cherry-pick to branch-2.9.
   
   Done. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] mattisonchao commented on a diff in pull request #15956: [modify][admin] Change the permissions of the schema API from Admin to normal produce/consume

Posted by GitBox <gi...@apache.org>.
mattisonchao commented on code in PR #15956:
URL: https://github.com/apache/pulsar/pull/15956#discussion_r890783653


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SchemasResourceBase.java:
##########
@@ -97,7 +98,7 @@ public void getSchema(boolean authoritative, AsyncResponse response) {
     }
 
     public CompletableFuture<SchemaAndMetadata> getSchemaAsync(boolean authoritative) {
-        return validateDestinationAndAdminOperationAsync(authoritative)
+        return validateOwnershipAndOperationAsync(authoritative, TopicOperation.CONSUME)

Review Comment:
   Could we introduce a new operation `GET_SCHEMA`, and then use `canProduce` and `canConsume` to verify it?
   
   The combination mode may bring more flexibility.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Technoboy- commented on a diff in pull request #15956: [modify][admin] Change the permissions of the schema API from Admin to normal produce/consume

Posted by GitBox <gi...@apache.org>.
Technoboy- commented on code in PR #15956:
URL: https://github.com/apache/pulsar/pull/15956#discussion_r890837284


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SchemasResourceBase.java:
##########
@@ -163,7 +164,7 @@ public void deleteSchema(boolean authoritative, AsyncResponse response, boolean
     }
 
     public CompletableFuture<SchemaVersion> deleteSchemaAsync(boolean authoritative, boolean force) {
-        return validateDestinationAndAdminOperationAsync(authoritative)
+        return validateOwnershipAndOperationAsync(authoritative, TopicOperation.PRODUCE)

Review Comment:
   yes, rolled back.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] gaoran10 commented on a diff in pull request #15956: [modify][admin] Change the permissions of the schema API from Admin to normal produce/consume

Posted by GitBox <gi...@apache.org>.
gaoran10 commented on code in PR #15956:
URL: https://github.com/apache/pulsar/pull/15956#discussion_r891846708


##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaWithAuthTest.java:
##########
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.admin;
+
+import com.google.common.collect.Sets;
+import io.jsonwebtoken.Jwts;
+import io.jsonwebtoken.SignatureAlgorithm;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.broker.authentication.AuthenticationProviderToken;
+import org.apache.pulsar.broker.authentication.utils.AuthTokenUtils;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminBuilder;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.auth.AuthenticationToken;
+import org.apache.pulsar.common.policies.data.AuthAction;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.apache.pulsar.common.schema.SchemaInfo;
+import org.mockito.Mockito;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+import javax.crypto.SecretKey;
+import java.util.Base64;
+import java.util.EnumSet;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+
+/**
+ * Unit tests for schema admin api.
+ */
+@Slf4j
+@Test(groups = "broker-admin")
+public class AdminApiSchemaWithAuthTest extends MockedPulsarServiceBaseTest {
+
+    private static final SecretKey SECRET_KEY = AuthTokenUtils.createSecretKey(SignatureAlgorithm.HS256);
+    private static final String ADMIN_TOKEN = Jwts.builder().setSubject("admin").signWith(SECRET_KEY).compact();
+    private static final String PRODUCE_CONSUME_TOKEN = Jwts.builder().setSubject("producer+consumer").signWith(SECRET_KEY).compact();
+
+    @BeforeMethod
+    @Override
+    public void setup() throws Exception {
+        conf.setAuthorizationEnabled(true);
+        conf.setAuthenticationEnabled(true);
+        conf.getProperties().setProperty("tokenSecretKey", "data:;base64,"
+                + Base64.getEncoder().encodeToString(SECRET_KEY.getEncoded()));
+        Set<String> providers = new HashSet<>();
+        providers.add(AuthenticationProviderToken.class.getName());
+        Set<String> superUserRoles = new HashSet<>();
+        superUserRoles.add("admin");
+        conf.setSuperUserRoles(superUserRoles);
+        conf.setAuthenticationProviders(providers);
+        conf.setSystemTopicEnabled(false);
+        conf.setTopicLevelPoliciesEnabled(false);
+        super.internalSetup();
+
+        PulsarAdminBuilder pulsarAdminBuilder = PulsarAdmin.builder().serviceHttpUrl(brokerUrl != null
+                        ? brokerUrl.toString() : brokerUrlTls.toString())
+                .authentication(AuthenticationToken.class.getName(),
+                        ADMIN_TOKEN);
+        admin = Mockito.spy(pulsarAdminBuilder.build());
+
+        // Setup namespaces
+        admin.clusters().createCluster("test", ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()).build());
+        TenantInfoImpl tenantInfo = new TenantInfoImpl(Sets.newHashSet("role1", "role2"), Sets.newHashSet("test"));
+        admin.tenants().createTenant("schematest", tenantInfo);
+        admin.namespaces().createNamespace("schematest/test", Sets.newHashSet("test"));
+    }
+
+    @AfterMethod(alwaysRun = true)
+    @Override
+    public void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    @Test
+    public void testGetCreateDeleteSchema() throws Exception {

Review Comment:
   Just a trivial suggestion, maybe we could add a data provider to test different cases.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] nodece commented on pull request #15956: [fix][admin] Fix producer/consume permission can’t get schema

Posted by GitBox <gi...@apache.org>.
nodece commented on PR #15956:
URL: https://github.com/apache/pulsar/pull/15956#issuecomment-1247714480

   This PR also needs to cherry-pick to branch-2.9.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] nodece commented on pull request #15956: [modify][admin] Change the permissions of the schema API from Admin to normal produce/consume

Posted by GitBox <gi...@apache.org>.
nodece commented on PR #15956:
URL: https://github.com/apache/pulsar/pull/15956#issuecomment-1149579742

   OK


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Technoboy- commented on a diff in pull request #15956: [modify][admin] Change the permissions of the schema API from Admin to normal produce/consume

Posted by GitBox <gi...@apache.org>.
Technoboy- commented on code in PR #15956:
URL: https://github.com/apache/pulsar/pull/15956#discussion_r893131436


##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaWithAuthTest.java:
##########
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.admin;
+
+import com.google.common.collect.Sets;
+import io.jsonwebtoken.Jwts;
+import io.jsonwebtoken.SignatureAlgorithm;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.broker.authentication.AuthenticationProviderToken;
+import org.apache.pulsar.broker.authentication.utils.AuthTokenUtils;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminBuilder;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.auth.AuthenticationToken;
+import org.apache.pulsar.common.policies.data.AuthAction;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.apache.pulsar.common.schema.SchemaInfo;
+import org.mockito.Mockito;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+import javax.crypto.SecretKey;
+import java.util.Base64;
+import java.util.EnumSet;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+
+/**
+ * Unit tests for schema admin api.
+ */
+@Slf4j
+@Test(groups = "broker-admin")
+public class AdminApiSchemaWithAuthTest extends MockedPulsarServiceBaseTest {
+
+    private static final SecretKey SECRET_KEY = AuthTokenUtils.createSecretKey(SignatureAlgorithm.HS256);
+    private static final String ADMIN_TOKEN = Jwts.builder().setSubject("admin").signWith(SECRET_KEY).compact();
+    private static final String PRODUCE_CONSUME_TOKEN = Jwts.builder().setSubject("producer+consumer").signWith(SECRET_KEY).compact();
+
+    @BeforeMethod
+    @Override
+    public void setup() throws Exception {
+        conf.setAuthorizationEnabled(true);
+        conf.setAuthenticationEnabled(true);
+        conf.getProperties().setProperty("tokenSecretKey", "data:;base64,"
+                + Base64.getEncoder().encodeToString(SECRET_KEY.getEncoded()));
+        Set<String> providers = new HashSet<>();
+        providers.add(AuthenticationProviderToken.class.getName());
+        Set<String> superUserRoles = new HashSet<>();
+        superUserRoles.add("admin");
+        conf.setSuperUserRoles(superUserRoles);
+        conf.setAuthenticationProviders(providers);
+        conf.setSystemTopicEnabled(false);
+        conf.setTopicLevelPoliciesEnabled(false);
+        super.internalSetup();
+
+        PulsarAdminBuilder pulsarAdminBuilder = PulsarAdmin.builder().serviceHttpUrl(brokerUrl != null
+                        ? brokerUrl.toString() : brokerUrlTls.toString())
+                .authentication(AuthenticationToken.class.getName(),
+                        ADMIN_TOKEN);
+        admin = Mockito.spy(pulsarAdminBuilder.build());
+
+        // Setup namespaces
+        admin.clusters().createCluster("test", ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()).build());
+        TenantInfoImpl tenantInfo = new TenantInfoImpl(Sets.newHashSet("role1", "role2"), Sets.newHashSet("test"));
+        admin.tenants().createTenant("schematest", tenantInfo);
+        admin.namespaces().createNamespace("schematest/test", Sets.newHashSet("test"));
+    }
+
+    @AfterMethod(alwaysRun = true)
+    @Override
+    public void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    @Test
+    public void testGetCreateDeleteSchema() throws Exception {
+        String topicName = "persistent://schematest/test/testCreateSchema";
+        PulsarAdmin adminWithoutPermission = PulsarAdmin.builder()
+                .serviceHttpUrl(brokerUrl != null ? brokerUrl.toString() : brokerUrlTls.toString())
+                .build();
+        PulsarAdmin adminWithAdminPermission = PulsarAdmin.builder()
+                .serviceHttpUrl(brokerUrl != null ? brokerUrl.toString() : brokerUrlTls.toString())
+                .authentication(AuthenticationToken.class.getName(), ADMIN_TOKEN)
+                .build();
+        PulsarAdmin adminWithProducerConsumePermission = PulsarAdmin.builder()
+                .serviceHttpUrl(brokerUrl != null ? brokerUrl.toString() : brokerUrlTls.toString())
+                .authentication(AuthenticationToken.class.getName(), PRODUCE_CONSUME_TOKEN)
+                .build();
+        SchemaInfo si = Schema.BOOL.getSchemaInfo();
+        try {
+            adminWithoutPermission.schemas().createSchema(topicName, si);
+            fail("Should have failed");
+        } catch (Exception ignore) {
+        }
+        admin.topics().grantPermission(topicName, "producer+consumer", EnumSet.of(AuthAction.produce, AuthAction.consume));
+        adminWithAdminPermission.schemas().createSchema(topicName, si);
+        try {
+            adminWithoutPermission.schemas().getSchemaInfo(topicName);
+            fail("Should have failed");
+        } catch (Exception ignore) {
+        }
+        admin.topics().grantPermission(topicName, "consumer", EnumSet.of(AuthAction.consume));
+        SchemaInfo readSi = adminWithProducerConsumePermission.schemas().getSchemaInfo(topicName);
+        assertEquals(readSi, si);
+        try {
+            adminWithoutPermission.schemas().getSchemaInfo(topicName, 0);
+            fail("Should have failed");
+        } catch (Exception ignore) {
+        }
+        readSi = adminWithProducerConsumePermission.schemas().getSchemaInfo(topicName, 0);
+        assertEquals(readSi, si);
+        List<SchemaInfo> allSchemas = adminWithProducerConsumePermission.schemas().getAllSchemas(topicName);
+        assertEquals(allSchemas.size(), 1);
+        SchemaInfo schemaInfo2 = Schema.BOOL.getSchemaInfo();
+        try {
+            adminWithoutPermission.schemas().testCompatibility(topicName, schemaInfo2);

Review Comment:
   fixed



##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaWithAuthTest.java:
##########
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.admin;
+
+import com.google.common.collect.Sets;
+import io.jsonwebtoken.Jwts;
+import io.jsonwebtoken.SignatureAlgorithm;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.broker.authentication.AuthenticationProviderToken;
+import org.apache.pulsar.broker.authentication.utils.AuthTokenUtils;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminBuilder;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.auth.AuthenticationToken;
+import org.apache.pulsar.common.policies.data.AuthAction;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.apache.pulsar.common.schema.SchemaInfo;
+import org.mockito.Mockito;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+import javax.crypto.SecretKey;
+import java.util.Base64;
+import java.util.EnumSet;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+
+/**
+ * Unit tests for schema admin api.
+ */
+@Slf4j
+@Test(groups = "broker-admin")
+public class AdminApiSchemaWithAuthTest extends MockedPulsarServiceBaseTest {
+
+    private static final SecretKey SECRET_KEY = AuthTokenUtils.createSecretKey(SignatureAlgorithm.HS256);
+    private static final String ADMIN_TOKEN = Jwts.builder().setSubject("admin").signWith(SECRET_KEY).compact();
+    private static final String PRODUCE_CONSUME_TOKEN = Jwts.builder().setSubject("producer+consumer").signWith(SECRET_KEY).compact();
+
+    @BeforeMethod
+    @Override
+    public void setup() throws Exception {
+        conf.setAuthorizationEnabled(true);
+        conf.setAuthenticationEnabled(true);
+        conf.getProperties().setProperty("tokenSecretKey", "data:;base64,"
+                + Base64.getEncoder().encodeToString(SECRET_KEY.getEncoded()));
+        Set<String> providers = new HashSet<>();
+        providers.add(AuthenticationProviderToken.class.getName());
+        Set<String> superUserRoles = new HashSet<>();
+        superUserRoles.add("admin");
+        conf.setSuperUserRoles(superUserRoles);
+        conf.setAuthenticationProviders(providers);
+        conf.setSystemTopicEnabled(false);
+        conf.setTopicLevelPoliciesEnabled(false);
+        super.internalSetup();
+
+        PulsarAdminBuilder pulsarAdminBuilder = PulsarAdmin.builder().serviceHttpUrl(brokerUrl != null
+                        ? brokerUrl.toString() : brokerUrlTls.toString())
+                .authentication(AuthenticationToken.class.getName(),
+                        ADMIN_TOKEN);
+        admin = Mockito.spy(pulsarAdminBuilder.build());
+
+        // Setup namespaces
+        admin.clusters().createCluster("test", ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()).build());
+        TenantInfoImpl tenantInfo = new TenantInfoImpl(Sets.newHashSet("role1", "role2"), Sets.newHashSet("test"));
+        admin.tenants().createTenant("schematest", tenantInfo);
+        admin.namespaces().createNamespace("schematest/test", Sets.newHashSet("test"));
+    }
+
+    @AfterMethod(alwaysRun = true)
+    @Override
+    public void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    @Test
+    public void testGetCreateDeleteSchema() throws Exception {
+        String topicName = "persistent://schematest/test/testCreateSchema";
+        PulsarAdmin adminWithoutPermission = PulsarAdmin.builder()
+                .serviceHttpUrl(brokerUrl != null ? brokerUrl.toString() : brokerUrlTls.toString())
+                .build();
+        PulsarAdmin adminWithAdminPermission = PulsarAdmin.builder()
+                .serviceHttpUrl(brokerUrl != null ? brokerUrl.toString() : brokerUrlTls.toString())
+                .authentication(AuthenticationToken.class.getName(), ADMIN_TOKEN)
+                .build();
+        PulsarAdmin adminWithProducerConsumePermission = PulsarAdmin.builder()
+                .serviceHttpUrl(brokerUrl != null ? brokerUrl.toString() : brokerUrlTls.toString())
+                .authentication(AuthenticationToken.class.getName(), PRODUCE_CONSUME_TOKEN)
+                .build();
+        SchemaInfo si = Schema.BOOL.getSchemaInfo();
+        try {
+            adminWithoutPermission.schemas().createSchema(topicName, si);
+            fail("Should have failed");
+        } catch (Exception ignore) {
+        }
+        admin.topics().grantPermission(topicName, "producer+consumer", EnumSet.of(AuthAction.produce, AuthAction.consume));
+        adminWithAdminPermission.schemas().createSchema(topicName, si);
+        try {
+            adminWithoutPermission.schemas().getSchemaInfo(topicName);
+            fail("Should have failed");
+        } catch (Exception ignore) {
+        }
+        admin.topics().grantPermission(topicName, "consumer", EnumSet.of(AuthAction.consume));
+        SchemaInfo readSi = adminWithProducerConsumePermission.schemas().getSchemaInfo(topicName);
+        assertEquals(readSi, si);
+        try {
+            adminWithoutPermission.schemas().getSchemaInfo(topicName, 0);
+            fail("Should have failed");
+        } catch (Exception ignore) {
+        }
+        readSi = adminWithProducerConsumePermission.schemas().getSchemaInfo(topicName, 0);
+        assertEquals(readSi, si);
+        List<SchemaInfo> allSchemas = adminWithProducerConsumePermission.schemas().getAllSchemas(topicName);
+        assertEquals(allSchemas.size(), 1);
+        SchemaInfo schemaInfo2 = Schema.BOOL.getSchemaInfo();
+        try {
+            adminWithoutPermission.schemas().testCompatibility(topicName, schemaInfo2);
+            fail("Should have failed");
+        } catch (Exception ignore) {
+        }
+        boolean compatibility = adminWithAdminPermission.schemas().testCompatibility(topicName, schemaInfo2).isCompatibility();
+        assertTrue(compatibility);
+        try {
+            adminWithoutPermission.schemas().getVersionBySchema(topicName, si);
+            fail("Should have failed");
+        } catch (Exception ignore) {
+        }
+        Long versionBySchema = adminWithProducerConsumePermission.schemas().getVersionBySchema(topicName, si);
+        assertEquals(versionBySchema, Long.valueOf(0L));
+        try {
+            adminWithoutPermission.schemas().deleteSchema(topicName);

Review Comment:
   fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] nodece commented on a diff in pull request #15956: [modify][admin] Change the permissions of the schema API from Admin to normal produce/consume

Posted by GitBox <gi...@apache.org>.
nodece commented on code in PR #15956:
URL: https://github.com/apache/pulsar/pull/15956#discussion_r892074471


##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaWithAuthTest.java:
##########
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.admin;
+
+import com.google.common.collect.Sets;
+import io.jsonwebtoken.Jwts;
+import io.jsonwebtoken.SignatureAlgorithm;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.broker.authentication.AuthenticationProviderToken;
+import org.apache.pulsar.broker.authentication.utils.AuthTokenUtils;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminBuilder;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.auth.AuthenticationToken;
+import org.apache.pulsar.common.policies.data.AuthAction;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.apache.pulsar.common.schema.SchemaInfo;
+import org.mockito.Mockito;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+import javax.crypto.SecretKey;
+import java.util.Base64;
+import java.util.EnumSet;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+
+/**
+ * Unit tests for schema admin api.
+ */
+@Slf4j
+@Test(groups = "broker-admin")
+public class AdminApiSchemaWithAuthTest extends MockedPulsarServiceBaseTest {
+
+    private static final SecretKey SECRET_KEY = AuthTokenUtils.createSecretKey(SignatureAlgorithm.HS256);
+    private static final String ADMIN_TOKEN = Jwts.builder().setSubject("admin").signWith(SECRET_KEY).compact();
+    private static final String PRODUCE_CONSUME_TOKEN = Jwts.builder().setSubject("producer+consumer").signWith(SECRET_KEY).compact();
+
+    @BeforeMethod
+    @Override
+    public void setup() throws Exception {
+        conf.setAuthorizationEnabled(true);
+        conf.setAuthenticationEnabled(true);
+        conf.getProperties().setProperty("tokenSecretKey", "data:;base64,"
+                + Base64.getEncoder().encodeToString(SECRET_KEY.getEncoded()));
+        Set<String> providers = new HashSet<>();
+        providers.add(AuthenticationProviderToken.class.getName());
+        Set<String> superUserRoles = new HashSet<>();
+        superUserRoles.add("admin");
+        conf.setSuperUserRoles(superUserRoles);
+        conf.setAuthenticationProviders(providers);
+        conf.setSystemTopicEnabled(false);
+        conf.setTopicLevelPoliciesEnabled(false);
+        super.internalSetup();
+
+        PulsarAdminBuilder pulsarAdminBuilder = PulsarAdmin.builder().serviceHttpUrl(brokerUrl != null
+                        ? brokerUrl.toString() : brokerUrlTls.toString())
+                .authentication(AuthenticationToken.class.getName(),
+                        ADMIN_TOKEN);
+        admin = Mockito.spy(pulsarAdminBuilder.build());
+
+        // Setup namespaces
+        admin.clusters().createCluster("test", ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()).build());
+        TenantInfoImpl tenantInfo = new TenantInfoImpl(Sets.newHashSet("role1", "role2"), Sets.newHashSet("test"));
+        admin.tenants().createTenant("schematest", tenantInfo);
+        admin.namespaces().createNamespace("schematest/test", Sets.newHashSet("test"));
+    }
+
+    @AfterMethod(alwaysRun = true)
+    @Override
+    public void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    @Test
+    public void testGetCreateDeleteSchema() throws Exception {
+        String topicName = "persistent://schematest/test/testCreateSchema";
+        PulsarAdmin adminWithoutPermission = PulsarAdmin.builder()
+                .serviceHttpUrl(brokerUrl != null ? brokerUrl.toString() : brokerUrlTls.toString())
+                .build();
+        PulsarAdmin adminWithAdminPermission = PulsarAdmin.builder()
+                .serviceHttpUrl(brokerUrl != null ? brokerUrl.toString() : brokerUrlTls.toString())
+                .authentication(AuthenticationToken.class.getName(), ADMIN_TOKEN)
+                .build();
+        PulsarAdmin adminWithProducerConsumePermission = PulsarAdmin.builder()
+                .serviceHttpUrl(brokerUrl != null ? brokerUrl.toString() : brokerUrlTls.toString())
+                .authentication(AuthenticationToken.class.getName(), PRODUCE_CONSUME_TOKEN)
+                .build();
+        SchemaInfo si = Schema.BOOL.getSchemaInfo();
+        try {
+            adminWithoutPermission.schemas().createSchema(topicName, si);
+            fail("Should have failed");
+        } catch (Exception ignore) {
+        }
+        admin.topics().grantPermission(topicName, "producer+consumer", EnumSet.of(AuthAction.produce, AuthAction.consume));
+        adminWithAdminPermission.schemas().createSchema(topicName, si);
+        try {
+            adminWithoutPermission.schemas().getSchemaInfo(topicName);
+            fail("Should have failed");
+        } catch (Exception ignore) {
+        }
+        admin.topics().grantPermission(topicName, "consumer", EnumSet.of(AuthAction.consume));

Review Comment:
   ```suggestion
           admin.topics().grantPermission(topicName, "producer+consumer", EnumSet.of(AuthAction.consume));
   ```
   Should be right?



##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaWithAuthTest.java:
##########
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.admin;
+
+import com.google.common.collect.Sets;
+import io.jsonwebtoken.Jwts;
+import io.jsonwebtoken.SignatureAlgorithm;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.broker.authentication.AuthenticationProviderToken;
+import org.apache.pulsar.broker.authentication.utils.AuthTokenUtils;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminBuilder;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.auth.AuthenticationToken;
+import org.apache.pulsar.common.policies.data.AuthAction;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.apache.pulsar.common.schema.SchemaInfo;
+import org.mockito.Mockito;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+import javax.crypto.SecretKey;
+import java.util.Base64;
+import java.util.EnumSet;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+
+/**
+ * Unit tests for schema admin api.
+ */
+@Slf4j
+@Test(groups = "broker-admin")
+public class AdminApiSchemaWithAuthTest extends MockedPulsarServiceBaseTest {
+
+    private static final SecretKey SECRET_KEY = AuthTokenUtils.createSecretKey(SignatureAlgorithm.HS256);
+    private static final String ADMIN_TOKEN = Jwts.builder().setSubject("admin").signWith(SECRET_KEY).compact();
+    private static final String PRODUCE_CONSUME_TOKEN = Jwts.builder().setSubject("producer+consumer").signWith(SECRET_KEY).compact();
+
+    @BeforeMethod
+    @Override
+    public void setup() throws Exception {
+        conf.setAuthorizationEnabled(true);
+        conf.setAuthenticationEnabled(true);
+        conf.getProperties().setProperty("tokenSecretKey", "data:;base64,"
+                + Base64.getEncoder().encodeToString(SECRET_KEY.getEncoded()));
+        Set<String> providers = new HashSet<>();
+        providers.add(AuthenticationProviderToken.class.getName());
+        Set<String> superUserRoles = new HashSet<>();
+        superUserRoles.add("admin");
+        conf.setSuperUserRoles(superUserRoles);
+        conf.setAuthenticationProviders(providers);
+        conf.setSystemTopicEnabled(false);
+        conf.setTopicLevelPoliciesEnabled(false);
+        super.internalSetup();
+
+        PulsarAdminBuilder pulsarAdminBuilder = PulsarAdmin.builder().serviceHttpUrl(brokerUrl != null
+                        ? brokerUrl.toString() : brokerUrlTls.toString())
+                .authentication(AuthenticationToken.class.getName(),
+                        ADMIN_TOKEN);
+        admin = Mockito.spy(pulsarAdminBuilder.build());
+
+        // Setup namespaces
+        admin.clusters().createCluster("test", ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()).build());
+        TenantInfoImpl tenantInfo = new TenantInfoImpl(Sets.newHashSet("role1", "role2"), Sets.newHashSet("test"));
+        admin.tenants().createTenant("schematest", tenantInfo);
+        admin.namespaces().createNamespace("schematest/test", Sets.newHashSet("test"));
+    }
+
+    @AfterMethod(alwaysRun = true)
+    @Override
+    public void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    @Test
+    public void testGetCreateDeleteSchema() throws Exception {
+        String topicName = "persistent://schematest/test/testCreateSchema";
+        PulsarAdmin adminWithoutPermission = PulsarAdmin.builder()
+                .serviceHttpUrl(brokerUrl != null ? brokerUrl.toString() : brokerUrlTls.toString())
+                .build();
+        PulsarAdmin adminWithAdminPermission = PulsarAdmin.builder()
+                .serviceHttpUrl(brokerUrl != null ? brokerUrl.toString() : brokerUrlTls.toString())
+                .authentication(AuthenticationToken.class.getName(), ADMIN_TOKEN)
+                .build();
+        PulsarAdmin adminWithProducerConsumePermission = PulsarAdmin.builder()
+                .serviceHttpUrl(brokerUrl != null ? brokerUrl.toString() : brokerUrlTls.toString())
+                .authentication(AuthenticationToken.class.getName(), PRODUCE_CONSUME_TOKEN)
+                .build();
+        SchemaInfo si = Schema.BOOL.getSchemaInfo();
+        try {
+            adminWithoutPermission.schemas().createSchema(topicName, si);

Review Comment:
   using `assertThrows`.



##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaWithAuthTest.java:
##########
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.admin;
+
+import com.google.common.collect.Sets;
+import io.jsonwebtoken.Jwts;
+import io.jsonwebtoken.SignatureAlgorithm;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.broker.authentication.AuthenticationProviderToken;
+import org.apache.pulsar.broker.authentication.utils.AuthTokenUtils;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminBuilder;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.auth.AuthenticationToken;
+import org.apache.pulsar.common.policies.data.AuthAction;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.apache.pulsar.common.schema.SchemaInfo;
+import org.mockito.Mockito;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+import javax.crypto.SecretKey;
+import java.util.Base64;
+import java.util.EnumSet;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+
+/**
+ * Unit tests for schema admin api.
+ */
+@Slf4j
+@Test(groups = "broker-admin")
+public class AdminApiSchemaWithAuthTest extends MockedPulsarServiceBaseTest {
+
+    private static final SecretKey SECRET_KEY = AuthTokenUtils.createSecretKey(SignatureAlgorithm.HS256);
+    private static final String ADMIN_TOKEN = Jwts.builder().setSubject("admin").signWith(SECRET_KEY).compact();
+    private static final String PRODUCE_CONSUME_TOKEN = Jwts.builder().setSubject("producer+consumer").signWith(SECRET_KEY).compact();
+
+    @BeforeMethod
+    @Override
+    public void setup() throws Exception {
+        conf.setAuthorizationEnabled(true);
+        conf.setAuthenticationEnabled(true);
+        conf.getProperties().setProperty("tokenSecretKey", "data:;base64,"
+                + Base64.getEncoder().encodeToString(SECRET_KEY.getEncoded()));
+        Set<String> providers = new HashSet<>();
+        providers.add(AuthenticationProviderToken.class.getName());
+        Set<String> superUserRoles = new HashSet<>();
+        superUserRoles.add("admin");
+        conf.setSuperUserRoles(superUserRoles);
+        conf.setAuthenticationProviders(providers);
+        conf.setSystemTopicEnabled(false);
+        conf.setTopicLevelPoliciesEnabled(false);
+        super.internalSetup();
+
+        PulsarAdminBuilder pulsarAdminBuilder = PulsarAdmin.builder().serviceHttpUrl(brokerUrl != null
+                        ? brokerUrl.toString() : brokerUrlTls.toString())
+                .authentication(AuthenticationToken.class.getName(),
+                        ADMIN_TOKEN);
+        admin = Mockito.spy(pulsarAdminBuilder.build());
+
+        // Setup namespaces
+        admin.clusters().createCluster("test", ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()).build());
+        TenantInfoImpl tenantInfo = new TenantInfoImpl(Sets.newHashSet("role1", "role2"), Sets.newHashSet("test"));
+        admin.tenants().createTenant("schematest", tenantInfo);
+        admin.namespaces().createNamespace("schematest/test", Sets.newHashSet("test"));
+    }
+
+    @AfterMethod(alwaysRun = true)
+    @Override
+    public void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    @Test
+    public void testGetCreateDeleteSchema() throws Exception {
+        String topicName = "persistent://schematest/test/testCreateSchema";
+        PulsarAdmin adminWithoutPermission = PulsarAdmin.builder()
+                .serviceHttpUrl(brokerUrl != null ? brokerUrl.toString() : brokerUrlTls.toString())
+                .build();
+        PulsarAdmin adminWithAdminPermission = PulsarAdmin.builder()
+                .serviceHttpUrl(brokerUrl != null ? brokerUrl.toString() : brokerUrlTls.toString())
+                .authentication(AuthenticationToken.class.getName(), ADMIN_TOKEN)
+                .build();
+        PulsarAdmin adminWithProducerConsumePermission = PulsarAdmin.builder()
+                .serviceHttpUrl(brokerUrl != null ? brokerUrl.toString() : brokerUrlTls.toString())
+                .authentication(AuthenticationToken.class.getName(), PRODUCE_CONSUME_TOKEN)
+                .build();
+        SchemaInfo si = Schema.BOOL.getSchemaInfo();
+        try {
+            adminWithoutPermission.schemas().createSchema(topicName, si);
+            fail("Should have failed");
+        } catch (Exception ignore) {
+        }
+        admin.topics().grantPermission(topicName, "producer+consumer", EnumSet.of(AuthAction.produce, AuthAction.consume));
+        adminWithAdminPermission.schemas().createSchema(topicName, si);
+        try {
+            adminWithoutPermission.schemas().getSchemaInfo(topicName);
+            fail("Should have failed");
+        } catch (Exception ignore) {
+        }
+        admin.topics().grantPermission(topicName, "consumer", EnumSet.of(AuthAction.consume));
+        SchemaInfo readSi = adminWithProducerConsumePermission.schemas().getSchemaInfo(topicName);
+        assertEquals(readSi, si);
+        try {
+            adminWithoutPermission.schemas().getSchemaInfo(topicName, 0);
+            fail("Should have failed");
+        } catch (Exception ignore) {
+        }
+        readSi = adminWithProducerConsumePermission.schemas().getSchemaInfo(topicName, 0);
+        assertEquals(readSi, si);
+        List<SchemaInfo> allSchemas = adminWithProducerConsumePermission.schemas().getAllSchemas(topicName);
+        assertEquals(allSchemas.size(), 1);
+        SchemaInfo schemaInfo2 = Schema.BOOL.getSchemaInfo();
+        try {
+            adminWithoutPermission.schemas().testCompatibility(topicName, schemaInfo2);
+            fail("Should have failed");
+        } catch (Exception ignore) {
+        }
+        boolean compatibility = adminWithAdminPermission.schemas().testCompatibility(topicName, schemaInfo2).isCompatibility();
+        assertTrue(compatibility);
+        try {
+            adminWithoutPermission.schemas().getVersionBySchema(topicName, si);
+            fail("Should have failed");
+        } catch (Exception ignore) {
+        }
+        Long versionBySchema = adminWithProducerConsumePermission.schemas().getVersionBySchema(topicName, si);
+        assertEquals(versionBySchema, Long.valueOf(0L));
+        try {
+            adminWithoutPermission.schemas().deleteSchema(topicName);
+            fail("Should have failed");
+        } catch (Exception ignore) {
+        }
+        admin.topics().grantPermission(topicName, "producer", EnumSet.of(AuthAction.produce));

Review Comment:
   ```suggestion
           admin.topics().grantPermission(topicName, "producer+consumer", EnumSet.of(AuthAction.produce));
   ```
   Should be right?



##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaWithAuthTest.java:
##########
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.admin;
+
+import com.google.common.collect.Sets;
+import io.jsonwebtoken.Jwts;
+import io.jsonwebtoken.SignatureAlgorithm;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.broker.authentication.AuthenticationProviderToken;
+import org.apache.pulsar.broker.authentication.utils.AuthTokenUtils;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminBuilder;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.auth.AuthenticationToken;
+import org.apache.pulsar.common.policies.data.AuthAction;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.apache.pulsar.common.schema.SchemaInfo;
+import org.mockito.Mockito;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+import javax.crypto.SecretKey;
+import java.util.Base64;
+import java.util.EnumSet;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+
+/**
+ * Unit tests for schema admin api.
+ */
+@Slf4j
+@Test(groups = "broker-admin")
+public class AdminApiSchemaWithAuthTest extends MockedPulsarServiceBaseTest {
+
+    private static final SecretKey SECRET_KEY = AuthTokenUtils.createSecretKey(SignatureAlgorithm.HS256);
+    private static final String ADMIN_TOKEN = Jwts.builder().setSubject("admin").signWith(SECRET_KEY).compact();
+    private static final String PRODUCE_CONSUME_TOKEN = Jwts.builder().setSubject("producer+consumer").signWith(SECRET_KEY).compact();
+
+    @BeforeMethod
+    @Override
+    public void setup() throws Exception {
+        conf.setAuthorizationEnabled(true);
+        conf.setAuthenticationEnabled(true);
+        conf.getProperties().setProperty("tokenSecretKey", "data:;base64,"
+                + Base64.getEncoder().encodeToString(SECRET_KEY.getEncoded()));
+        Set<String> providers = new HashSet<>();
+        providers.add(AuthenticationProviderToken.class.getName());
+        Set<String> superUserRoles = new HashSet<>();
+        superUserRoles.add("admin");
+        conf.setSuperUserRoles(superUserRoles);
+        conf.setAuthenticationProviders(providers);
+        conf.setSystemTopicEnabled(false);
+        conf.setTopicLevelPoliciesEnabled(false);
+        super.internalSetup();
+
+        PulsarAdminBuilder pulsarAdminBuilder = PulsarAdmin.builder().serviceHttpUrl(brokerUrl != null
+                        ? brokerUrl.toString() : brokerUrlTls.toString())
+                .authentication(AuthenticationToken.class.getName(),
+                        ADMIN_TOKEN);
+        admin = Mockito.spy(pulsarAdminBuilder.build());
+
+        // Setup namespaces
+        admin.clusters().createCluster("test", ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()).build());
+        TenantInfoImpl tenantInfo = new TenantInfoImpl(Sets.newHashSet("role1", "role2"), Sets.newHashSet("test"));
+        admin.tenants().createTenant("schematest", tenantInfo);
+        admin.namespaces().createNamespace("schematest/test", Sets.newHashSet("test"));
+    }
+
+    @AfterMethod(alwaysRun = true)
+    @Override
+    public void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    @Test
+    public void testGetCreateDeleteSchema() throws Exception {
+        String topicName = "persistent://schematest/test/testCreateSchema";
+        PulsarAdmin adminWithoutPermission = PulsarAdmin.builder()
+                .serviceHttpUrl(brokerUrl != null ? brokerUrl.toString() : brokerUrlTls.toString())
+                .build();
+        PulsarAdmin adminWithAdminPermission = PulsarAdmin.builder()
+                .serviceHttpUrl(brokerUrl != null ? brokerUrl.toString() : brokerUrlTls.toString())
+                .authentication(AuthenticationToken.class.getName(), ADMIN_TOKEN)
+                .build();
+        PulsarAdmin adminWithProducerConsumePermission = PulsarAdmin.builder()
+                .serviceHttpUrl(brokerUrl != null ? brokerUrl.toString() : brokerUrlTls.toString())
+                .authentication(AuthenticationToken.class.getName(), PRODUCE_CONSUME_TOKEN)
+                .build();
+        SchemaInfo si = Schema.BOOL.getSchemaInfo();
+        try {
+            adminWithoutPermission.schemas().createSchema(topicName, si);
+            fail("Should have failed");
+        } catch (Exception ignore) {
+        }
+        admin.topics().grantPermission(topicName, "producer+consumer", EnumSet.of(AuthAction.produce, AuthAction.consume));
+        adminWithAdminPermission.schemas().createSchema(topicName, si);
+        try {
+            adminWithoutPermission.schemas().getSchemaInfo(topicName);
+            fail("Should have failed");
+        } catch (Exception ignore) {
+        }
+        admin.topics().grantPermission(topicName, "consumer", EnumSet.of(AuthAction.consume));
+        SchemaInfo readSi = adminWithProducerConsumePermission.schemas().getSchemaInfo(topicName);
+        assertEquals(readSi, si);
+        try {
+            adminWithoutPermission.schemas().getSchemaInfo(topicName, 0);
+            fail("Should have failed");
+        } catch (Exception ignore) {
+        }
+        readSi = adminWithProducerConsumePermission.schemas().getSchemaInfo(topicName, 0);
+        assertEquals(readSi, si);
+        List<SchemaInfo> allSchemas = adminWithProducerConsumePermission.schemas().getAllSchemas(topicName);
+        assertEquals(allSchemas.size(), 1);
+        SchemaInfo schemaInfo2 = Schema.BOOL.getSchemaInfo();
+        try {
+            adminWithoutPermission.schemas().testCompatibility(topicName, schemaInfo2);

Review Comment:
   using `assertThrows`.



##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaWithAuthTest.java:
##########
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.admin;
+
+import com.google.common.collect.Sets;
+import io.jsonwebtoken.Jwts;
+import io.jsonwebtoken.SignatureAlgorithm;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.broker.authentication.AuthenticationProviderToken;
+import org.apache.pulsar.broker.authentication.utils.AuthTokenUtils;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminBuilder;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.auth.AuthenticationToken;
+import org.apache.pulsar.common.policies.data.AuthAction;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.apache.pulsar.common.schema.SchemaInfo;
+import org.mockito.Mockito;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+import javax.crypto.SecretKey;
+import java.util.Base64;
+import java.util.EnumSet;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+
+/**
+ * Unit tests for schema admin api.
+ */
+@Slf4j
+@Test(groups = "broker-admin")
+public class AdminApiSchemaWithAuthTest extends MockedPulsarServiceBaseTest {
+
+    private static final SecretKey SECRET_KEY = AuthTokenUtils.createSecretKey(SignatureAlgorithm.HS256);
+    private static final String ADMIN_TOKEN = Jwts.builder().setSubject("admin").signWith(SECRET_KEY).compact();
+    private static final String PRODUCE_CONSUME_TOKEN = Jwts.builder().setSubject("producer+consumer").signWith(SECRET_KEY).compact();
+
+    @BeforeMethod
+    @Override
+    public void setup() throws Exception {
+        conf.setAuthorizationEnabled(true);
+        conf.setAuthenticationEnabled(true);
+        conf.getProperties().setProperty("tokenSecretKey", "data:;base64,"
+                + Base64.getEncoder().encodeToString(SECRET_KEY.getEncoded()));
+        Set<String> providers = new HashSet<>();
+        providers.add(AuthenticationProviderToken.class.getName());
+        Set<String> superUserRoles = new HashSet<>();
+        superUserRoles.add("admin");
+        conf.setSuperUserRoles(superUserRoles);
+        conf.setAuthenticationProviders(providers);
+        conf.setSystemTopicEnabled(false);
+        conf.setTopicLevelPoliciesEnabled(false);
+        super.internalSetup();
+
+        PulsarAdminBuilder pulsarAdminBuilder = PulsarAdmin.builder().serviceHttpUrl(brokerUrl != null
+                        ? brokerUrl.toString() : brokerUrlTls.toString())
+                .authentication(AuthenticationToken.class.getName(),
+                        ADMIN_TOKEN);
+        admin = Mockito.spy(pulsarAdminBuilder.build());
+
+        // Setup namespaces
+        admin.clusters().createCluster("test", ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()).build());
+        TenantInfoImpl tenantInfo = new TenantInfoImpl(Sets.newHashSet("role1", "role2"), Sets.newHashSet("test"));
+        admin.tenants().createTenant("schematest", tenantInfo);
+        admin.namespaces().createNamespace("schematest/test", Sets.newHashSet("test"));
+    }
+
+    @AfterMethod(alwaysRun = true)
+    @Override
+    public void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    @Test
+    public void testGetCreateDeleteSchema() throws Exception {
+        String topicName = "persistent://schematest/test/testCreateSchema";
+        PulsarAdmin adminWithoutPermission = PulsarAdmin.builder()
+                .serviceHttpUrl(brokerUrl != null ? brokerUrl.toString() : brokerUrlTls.toString())
+                .build();
+        PulsarAdmin adminWithAdminPermission = PulsarAdmin.builder()
+                .serviceHttpUrl(brokerUrl != null ? brokerUrl.toString() : brokerUrlTls.toString())
+                .authentication(AuthenticationToken.class.getName(), ADMIN_TOKEN)
+                .build();
+        PulsarAdmin adminWithProducerConsumePermission = PulsarAdmin.builder()
+                .serviceHttpUrl(brokerUrl != null ? brokerUrl.toString() : brokerUrlTls.toString())
+                .authentication(AuthenticationToken.class.getName(), PRODUCE_CONSUME_TOKEN)
+                .build();
+        SchemaInfo si = Schema.BOOL.getSchemaInfo();
+        try {
+            adminWithoutPermission.schemas().createSchema(topicName, si);
+            fail("Should have failed");
+        } catch (Exception ignore) {
+        }
+        admin.topics().grantPermission(topicName, "producer+consumer", EnumSet.of(AuthAction.produce, AuthAction.consume));
+        adminWithAdminPermission.schemas().createSchema(topicName, si);
+        try {
+            adminWithoutPermission.schemas().getSchemaInfo(topicName);

Review Comment:
   using `assertThrows`.



##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaWithAuthTest.java:
##########
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.admin;
+
+import com.google.common.collect.Sets;
+import io.jsonwebtoken.Jwts;
+import io.jsonwebtoken.SignatureAlgorithm;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.broker.authentication.AuthenticationProviderToken;
+import org.apache.pulsar.broker.authentication.utils.AuthTokenUtils;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminBuilder;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.auth.AuthenticationToken;
+import org.apache.pulsar.common.policies.data.AuthAction;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.apache.pulsar.common.schema.SchemaInfo;
+import org.mockito.Mockito;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+import javax.crypto.SecretKey;
+import java.util.Base64;
+import java.util.EnumSet;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+
+/**
+ * Unit tests for schema admin api.
+ */
+@Slf4j
+@Test(groups = "broker-admin")
+public class AdminApiSchemaWithAuthTest extends MockedPulsarServiceBaseTest {
+
+    private static final SecretKey SECRET_KEY = AuthTokenUtils.createSecretKey(SignatureAlgorithm.HS256);
+    private static final String ADMIN_TOKEN = Jwts.builder().setSubject("admin").signWith(SECRET_KEY).compact();
+    private static final String PRODUCE_CONSUME_TOKEN = Jwts.builder().setSubject("producer+consumer").signWith(SECRET_KEY).compact();
+
+    @BeforeMethod
+    @Override
+    public void setup() throws Exception {
+        conf.setAuthorizationEnabled(true);
+        conf.setAuthenticationEnabled(true);
+        conf.getProperties().setProperty("tokenSecretKey", "data:;base64,"
+                + Base64.getEncoder().encodeToString(SECRET_KEY.getEncoded()));
+        Set<String> providers = new HashSet<>();
+        providers.add(AuthenticationProviderToken.class.getName());
+        Set<String> superUserRoles = new HashSet<>();
+        superUserRoles.add("admin");
+        conf.setSuperUserRoles(superUserRoles);
+        conf.setAuthenticationProviders(providers);
+        conf.setSystemTopicEnabled(false);
+        conf.setTopicLevelPoliciesEnabled(false);
+        super.internalSetup();
+
+        PulsarAdminBuilder pulsarAdminBuilder = PulsarAdmin.builder().serviceHttpUrl(brokerUrl != null
+                        ? brokerUrl.toString() : brokerUrlTls.toString())
+                .authentication(AuthenticationToken.class.getName(),
+                        ADMIN_TOKEN);
+        admin = Mockito.spy(pulsarAdminBuilder.build());
+
+        // Setup namespaces
+        admin.clusters().createCluster("test", ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()).build());
+        TenantInfoImpl tenantInfo = new TenantInfoImpl(Sets.newHashSet("role1", "role2"), Sets.newHashSet("test"));
+        admin.tenants().createTenant("schematest", tenantInfo);
+        admin.namespaces().createNamespace("schematest/test", Sets.newHashSet("test"));
+    }
+
+    @AfterMethod(alwaysRun = true)
+    @Override
+    public void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    @Test
+    public void testGetCreateDeleteSchema() throws Exception {
+        String topicName = "persistent://schematest/test/testCreateSchema";
+        PulsarAdmin adminWithoutPermission = PulsarAdmin.builder()
+                .serviceHttpUrl(brokerUrl != null ? brokerUrl.toString() : brokerUrlTls.toString())
+                .build();
+        PulsarAdmin adminWithAdminPermission = PulsarAdmin.builder()
+                .serviceHttpUrl(brokerUrl != null ? brokerUrl.toString() : brokerUrlTls.toString())
+                .authentication(AuthenticationToken.class.getName(), ADMIN_TOKEN)
+                .build();
+        PulsarAdmin adminWithProducerConsumePermission = PulsarAdmin.builder()
+                .serviceHttpUrl(brokerUrl != null ? brokerUrl.toString() : brokerUrlTls.toString())
+                .authentication(AuthenticationToken.class.getName(), PRODUCE_CONSUME_TOKEN)
+                .build();
+        SchemaInfo si = Schema.BOOL.getSchemaInfo();
+        try {
+            adminWithoutPermission.schemas().createSchema(topicName, si);
+            fail("Should have failed");
+        } catch (Exception ignore) {
+        }
+        admin.topics().grantPermission(topicName, "producer+consumer", EnumSet.of(AuthAction.produce, AuthAction.consume));
+        adminWithAdminPermission.schemas().createSchema(topicName, si);
+        try {
+            adminWithoutPermission.schemas().getSchemaInfo(topicName);
+            fail("Should have failed");
+        } catch (Exception ignore) {
+        }
+        admin.topics().grantPermission(topicName, "consumer", EnumSet.of(AuthAction.consume));
+        SchemaInfo readSi = adminWithProducerConsumePermission.schemas().getSchemaInfo(topicName);
+        assertEquals(readSi, si);
+        try {
+            adminWithoutPermission.schemas().getSchemaInfo(topicName, 0);
+            fail("Should have failed");
+        } catch (Exception ignore) {
+        }
+        readSi = adminWithProducerConsumePermission.schemas().getSchemaInfo(topicName, 0);
+        assertEquals(readSi, si);
+        List<SchemaInfo> allSchemas = adminWithProducerConsumePermission.schemas().getAllSchemas(topicName);
+        assertEquals(allSchemas.size(), 1);
+        SchemaInfo schemaInfo2 = Schema.BOOL.getSchemaInfo();
+        try {
+            adminWithoutPermission.schemas().testCompatibility(topicName, schemaInfo2);
+            fail("Should have failed");
+        } catch (Exception ignore) {
+        }
+        boolean compatibility = adminWithAdminPermission.schemas().testCompatibility(topicName, schemaInfo2).isCompatibility();
+        assertTrue(compatibility);
+        try {
+            adminWithoutPermission.schemas().getVersionBySchema(topicName, si);
+            fail("Should have failed");
+        } catch (Exception ignore) {
+        }
+        Long versionBySchema = adminWithProducerConsumePermission.schemas().getVersionBySchema(topicName, si);
+        assertEquals(versionBySchema, Long.valueOf(0L));
+        try {
+            adminWithoutPermission.schemas().deleteSchema(topicName);

Review Comment:
   using `assertThrows`.



##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaWithAuthTest.java:
##########
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.admin;
+
+import com.google.common.collect.Sets;
+import io.jsonwebtoken.Jwts;
+import io.jsonwebtoken.SignatureAlgorithm;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.broker.authentication.AuthenticationProviderToken;
+import org.apache.pulsar.broker.authentication.utils.AuthTokenUtils;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminBuilder;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.auth.AuthenticationToken;
+import org.apache.pulsar.common.policies.data.AuthAction;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.apache.pulsar.common.schema.SchemaInfo;
+import org.mockito.Mockito;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+import javax.crypto.SecretKey;
+import java.util.Base64;
+import java.util.EnumSet;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+
+/**
+ * Unit tests for schema admin api.
+ */
+@Slf4j
+@Test(groups = "broker-admin")
+public class AdminApiSchemaWithAuthTest extends MockedPulsarServiceBaseTest {
+
+    private static final SecretKey SECRET_KEY = AuthTokenUtils.createSecretKey(SignatureAlgorithm.HS256);
+    private static final String ADMIN_TOKEN = Jwts.builder().setSubject("admin").signWith(SECRET_KEY).compact();
+    private static final String PRODUCE_CONSUME_TOKEN = Jwts.builder().setSubject("producer+consumer").signWith(SECRET_KEY).compact();
+
+    @BeforeMethod
+    @Override
+    public void setup() throws Exception {
+        conf.setAuthorizationEnabled(true);
+        conf.setAuthenticationEnabled(true);
+        conf.getProperties().setProperty("tokenSecretKey", "data:;base64,"
+                + Base64.getEncoder().encodeToString(SECRET_KEY.getEncoded()));
+        Set<String> providers = new HashSet<>();
+        providers.add(AuthenticationProviderToken.class.getName());
+        Set<String> superUserRoles = new HashSet<>();
+        superUserRoles.add("admin");
+        conf.setSuperUserRoles(superUserRoles);
+        conf.setAuthenticationProviders(providers);
+        conf.setSystemTopicEnabled(false);
+        conf.setTopicLevelPoliciesEnabled(false);
+        super.internalSetup();
+
+        PulsarAdminBuilder pulsarAdminBuilder = PulsarAdmin.builder().serviceHttpUrl(brokerUrl != null
+                        ? brokerUrl.toString() : brokerUrlTls.toString())
+                .authentication(AuthenticationToken.class.getName(),
+                        ADMIN_TOKEN);
+        admin = Mockito.spy(pulsarAdminBuilder.build());
+
+        // Setup namespaces
+        admin.clusters().createCluster("test", ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()).build());
+        TenantInfoImpl tenantInfo = new TenantInfoImpl(Sets.newHashSet("role1", "role2"), Sets.newHashSet("test"));
+        admin.tenants().createTenant("schematest", tenantInfo);
+        admin.namespaces().createNamespace("schematest/test", Sets.newHashSet("test"));
+    }
+
+    @AfterMethod(alwaysRun = true)
+    @Override
+    public void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    @Test
+    public void testGetCreateDeleteSchema() throws Exception {
+        String topicName = "persistent://schematest/test/testCreateSchema";
+        PulsarAdmin adminWithoutPermission = PulsarAdmin.builder()
+                .serviceHttpUrl(brokerUrl != null ? brokerUrl.toString() : brokerUrlTls.toString())
+                .build();
+        PulsarAdmin adminWithAdminPermission = PulsarAdmin.builder()
+                .serviceHttpUrl(brokerUrl != null ? brokerUrl.toString() : brokerUrlTls.toString())
+                .authentication(AuthenticationToken.class.getName(), ADMIN_TOKEN)
+                .build();
+        PulsarAdmin adminWithProducerConsumePermission = PulsarAdmin.builder()
+                .serviceHttpUrl(brokerUrl != null ? brokerUrl.toString() : brokerUrlTls.toString())
+                .authentication(AuthenticationToken.class.getName(), PRODUCE_CONSUME_TOKEN)
+                .build();
+        SchemaInfo si = Schema.BOOL.getSchemaInfo();
+        try {
+            adminWithoutPermission.schemas().createSchema(topicName, si);
+            fail("Should have failed");
+        } catch (Exception ignore) {
+        }
+        admin.topics().grantPermission(topicName, "producer+consumer", EnumSet.of(AuthAction.produce, AuthAction.consume));
+        adminWithAdminPermission.schemas().createSchema(topicName, si);
+        try {
+            adminWithoutPermission.schemas().getSchemaInfo(topicName);
+            fail("Should have failed");
+        } catch (Exception ignore) {
+        }
+        admin.topics().grantPermission(topicName, "consumer", EnumSet.of(AuthAction.consume));
+        SchemaInfo readSi = adminWithProducerConsumePermission.schemas().getSchemaInfo(topicName);
+        assertEquals(readSi, si);
+        try {
+            adminWithoutPermission.schemas().getSchemaInfo(topicName, 0);

Review Comment:
   using `assertThrows`.



##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaWithAuthTest.java:
##########
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.admin;
+
+import com.google.common.collect.Sets;
+import io.jsonwebtoken.Jwts;
+import io.jsonwebtoken.SignatureAlgorithm;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.broker.authentication.AuthenticationProviderToken;
+import org.apache.pulsar.broker.authentication.utils.AuthTokenUtils;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminBuilder;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.auth.AuthenticationToken;
+import org.apache.pulsar.common.policies.data.AuthAction;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.apache.pulsar.common.schema.SchemaInfo;
+import org.mockito.Mockito;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+import javax.crypto.SecretKey;
+import java.util.Base64;
+import java.util.EnumSet;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+
+/**
+ * Unit tests for schema admin api.
+ */
+@Slf4j
+@Test(groups = "broker-admin")
+public class AdminApiSchemaWithAuthTest extends MockedPulsarServiceBaseTest {
+
+    private static final SecretKey SECRET_KEY = AuthTokenUtils.createSecretKey(SignatureAlgorithm.HS256);
+    private static final String ADMIN_TOKEN = Jwts.builder().setSubject("admin").signWith(SECRET_KEY).compact();
+    private static final String PRODUCE_CONSUME_TOKEN = Jwts.builder().setSubject("producer+consumer").signWith(SECRET_KEY).compact();
+
+    @BeforeMethod
+    @Override
+    public void setup() throws Exception {
+        conf.setAuthorizationEnabled(true);
+        conf.setAuthenticationEnabled(true);
+        conf.getProperties().setProperty("tokenSecretKey", "data:;base64,"
+                + Base64.getEncoder().encodeToString(SECRET_KEY.getEncoded()));
+        Set<String> providers = new HashSet<>();
+        providers.add(AuthenticationProviderToken.class.getName());
+        Set<String> superUserRoles = new HashSet<>();
+        superUserRoles.add("admin");
+        conf.setSuperUserRoles(superUserRoles);
+        conf.setAuthenticationProviders(providers);
+        conf.setSystemTopicEnabled(false);
+        conf.setTopicLevelPoliciesEnabled(false);
+        super.internalSetup();
+
+        PulsarAdminBuilder pulsarAdminBuilder = PulsarAdmin.builder().serviceHttpUrl(brokerUrl != null
+                        ? brokerUrl.toString() : brokerUrlTls.toString())
+                .authentication(AuthenticationToken.class.getName(),
+                        ADMIN_TOKEN);
+        admin = Mockito.spy(pulsarAdminBuilder.build());
+
+        // Setup namespaces
+        admin.clusters().createCluster("test", ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()).build());
+        TenantInfoImpl tenantInfo = new TenantInfoImpl(Sets.newHashSet("role1", "role2"), Sets.newHashSet("test"));
+        admin.tenants().createTenant("schematest", tenantInfo);
+        admin.namespaces().createNamespace("schematest/test", Sets.newHashSet("test"));
+    }
+
+    @AfterMethod(alwaysRun = true)
+    @Override
+    public void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    @Test
+    public void testGetCreateDeleteSchema() throws Exception {
+        String topicName = "persistent://schematest/test/testCreateSchema";
+        PulsarAdmin adminWithoutPermission = PulsarAdmin.builder()
+                .serviceHttpUrl(brokerUrl != null ? brokerUrl.toString() : brokerUrlTls.toString())
+                .build();
+        PulsarAdmin adminWithAdminPermission = PulsarAdmin.builder()
+                .serviceHttpUrl(brokerUrl != null ? brokerUrl.toString() : brokerUrlTls.toString())
+                .authentication(AuthenticationToken.class.getName(), ADMIN_TOKEN)
+                .build();
+        PulsarAdmin adminWithProducerConsumePermission = PulsarAdmin.builder()
+                .serviceHttpUrl(brokerUrl != null ? brokerUrl.toString() : brokerUrlTls.toString())
+                .authentication(AuthenticationToken.class.getName(), PRODUCE_CONSUME_TOKEN)
+                .build();
+        SchemaInfo si = Schema.BOOL.getSchemaInfo();
+        try {
+            adminWithoutPermission.schemas().createSchema(topicName, si);
+            fail("Should have failed");
+        } catch (Exception ignore) {
+        }
+        admin.topics().grantPermission(topicName, "producer+consumer", EnumSet.of(AuthAction.produce, AuthAction.consume));
+        adminWithAdminPermission.schemas().createSchema(topicName, si);
+        try {
+            adminWithoutPermission.schemas().getSchemaInfo(topicName);
+            fail("Should have failed");
+        } catch (Exception ignore) {
+        }
+        admin.topics().grantPermission(topicName, "consumer", EnumSet.of(AuthAction.consume));
+        SchemaInfo readSi = adminWithProducerConsumePermission.schemas().getSchemaInfo(topicName);
+        assertEquals(readSi, si);
+        try {
+            adminWithoutPermission.schemas().getSchemaInfo(topicName, 0);
+            fail("Should have failed");
+        } catch (Exception ignore) {
+        }
+        readSi = adminWithProducerConsumePermission.schemas().getSchemaInfo(topicName, 0);
+        assertEquals(readSi, si);
+        List<SchemaInfo> allSchemas = adminWithProducerConsumePermission.schemas().getAllSchemas(topicName);
+        assertEquals(allSchemas.size(), 1);
+        SchemaInfo schemaInfo2 = Schema.BOOL.getSchemaInfo();
+        try {
+            adminWithoutPermission.schemas().testCompatibility(topicName, schemaInfo2);
+            fail("Should have failed");
+        } catch (Exception ignore) {
+        }
+        boolean compatibility = adminWithAdminPermission.schemas().testCompatibility(topicName, schemaInfo2).isCompatibility();
+        assertTrue(compatibility);
+        try {
+            adminWithoutPermission.schemas().getVersionBySchema(topicName, si);

Review Comment:
   using `assertThrows`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] nodece commented on pull request #15956: [modify][admin] Change the permissions of the schema API from Admin to normal produce/consume

Posted by GitBox <gi...@apache.org>.
nodece commented on PR #15956:
URL: https://github.com/apache/pulsar/pull/15956#issuecomment-1148334061

   I suggest introduce the `GET_SCHEMA` in `org.apache.pulsar.common.policies.data.TopicOperation`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] mattisonchao commented on a diff in pull request #15956: [modify][admin] Change the permissions of the schema API from Admin to normal produce/consume

Posted by GitBox <gi...@apache.org>.
mattisonchao commented on code in PR #15956:
URL: https://github.com/apache/pulsar/pull/15956#discussion_r890784868


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java:
##########
@@ -749,9 +747,7 @@ private CompletableFuture<Void> provisionPartitionedTopicPath(AsyncResponse asyn
     }
 
     protected CompletableFuture<SchemaCompatibilityStrategy> getSchemaCompatibilityStrategyAsync() {
-        return validateTopicPolicyOperationAsync(topicName,
-                PolicyName.SCHEMA_COMPATIBILITY_STRATEGY,
-                PolicyOperation.READ)

Review Comment:
   +1, And this change may break the original behaviour?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Technoboy- commented on a diff in pull request #15956: [modify][admin] Change the permissions of the schema API from Admin to normal produce/consume

Posted by GitBox <gi...@apache.org>.
Technoboy- commented on code in PR #15956:
URL: https://github.com/apache/pulsar/pull/15956#discussion_r890825777


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SchemasResourceBase.java:
##########
@@ -97,7 +98,7 @@ public void getSchema(boolean authoritative, AsyncResponse response) {
     }
 
     public CompletableFuture<SchemaAndMetadata> getSchemaAsync(boolean authoritative) {
-        return validateDestinationAndAdminOperationAsync(authoritative)
+        return validateOwnershipAndOperationAsync(authoritative, TopicOperation.CONSUME)

Review Comment:
   Ah, now we can use `GET_METADATA` instead.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Technoboy- merged pull request #15956: [modify][admin] Change the permissions of the schema API from Admin to normal produce/consume

Posted by GitBox <gi...@apache.org>.
Technoboy- merged PR #15956:
URL: https://github.com/apache/pulsar/pull/15956


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Technoboy- commented on a diff in pull request #15956: [modify][admin] Change the permissions of the schema API from Admin to normal produce/consume

Posted by GitBox <gi...@apache.org>.
Technoboy- commented on code in PR #15956:
URL: https://github.com/apache/pulsar/pull/15956#discussion_r893131574


##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaWithAuthTest.java:
##########
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.admin;
+
+import com.google.common.collect.Sets;
+import io.jsonwebtoken.Jwts;
+import io.jsonwebtoken.SignatureAlgorithm;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.broker.authentication.AuthenticationProviderToken;
+import org.apache.pulsar.broker.authentication.utils.AuthTokenUtils;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminBuilder;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.auth.AuthenticationToken;
+import org.apache.pulsar.common.policies.data.AuthAction;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.apache.pulsar.common.schema.SchemaInfo;
+import org.mockito.Mockito;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+import javax.crypto.SecretKey;
+import java.util.Base64;
+import java.util.EnumSet;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+
+/**
+ * Unit tests for schema admin api.
+ */
+@Slf4j
+@Test(groups = "broker-admin")
+public class AdminApiSchemaWithAuthTest extends MockedPulsarServiceBaseTest {
+
+    private static final SecretKey SECRET_KEY = AuthTokenUtils.createSecretKey(SignatureAlgorithm.HS256);
+    private static final String ADMIN_TOKEN = Jwts.builder().setSubject("admin").signWith(SECRET_KEY).compact();
+    private static final String PRODUCE_CONSUME_TOKEN = Jwts.builder().setSubject("producer+consumer").signWith(SECRET_KEY).compact();
+
+    @BeforeMethod
+    @Override
+    public void setup() throws Exception {
+        conf.setAuthorizationEnabled(true);
+        conf.setAuthenticationEnabled(true);
+        conf.getProperties().setProperty("tokenSecretKey", "data:;base64,"
+                + Base64.getEncoder().encodeToString(SECRET_KEY.getEncoded()));
+        Set<String> providers = new HashSet<>();
+        providers.add(AuthenticationProviderToken.class.getName());
+        Set<String> superUserRoles = new HashSet<>();
+        superUserRoles.add("admin");
+        conf.setSuperUserRoles(superUserRoles);
+        conf.setAuthenticationProviders(providers);
+        conf.setSystemTopicEnabled(false);
+        conf.setTopicLevelPoliciesEnabled(false);
+        super.internalSetup();
+
+        PulsarAdminBuilder pulsarAdminBuilder = PulsarAdmin.builder().serviceHttpUrl(brokerUrl != null
+                        ? brokerUrl.toString() : brokerUrlTls.toString())
+                .authentication(AuthenticationToken.class.getName(),
+                        ADMIN_TOKEN);
+        admin = Mockito.spy(pulsarAdminBuilder.build());
+
+        // Setup namespaces
+        admin.clusters().createCluster("test", ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()).build());
+        TenantInfoImpl tenantInfo = new TenantInfoImpl(Sets.newHashSet("role1", "role2"), Sets.newHashSet("test"));
+        admin.tenants().createTenant("schematest", tenantInfo);
+        admin.namespaces().createNamespace("schematest/test", Sets.newHashSet("test"));
+    }
+
+    @AfterMethod(alwaysRun = true)
+    @Override
+    public void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    @Test
+    public void testGetCreateDeleteSchema() throws Exception {
+        String topicName = "persistent://schematest/test/testCreateSchema";
+        PulsarAdmin adminWithoutPermission = PulsarAdmin.builder()
+                .serviceHttpUrl(brokerUrl != null ? brokerUrl.toString() : brokerUrlTls.toString())
+                .build();
+        PulsarAdmin adminWithAdminPermission = PulsarAdmin.builder()
+                .serviceHttpUrl(brokerUrl != null ? brokerUrl.toString() : brokerUrlTls.toString())
+                .authentication(AuthenticationToken.class.getName(), ADMIN_TOKEN)
+                .build();
+        PulsarAdmin adminWithProducerConsumePermission = PulsarAdmin.builder()
+                .serviceHttpUrl(brokerUrl != null ? brokerUrl.toString() : brokerUrlTls.toString())
+                .authentication(AuthenticationToken.class.getName(), PRODUCE_CONSUME_TOKEN)
+                .build();
+        SchemaInfo si = Schema.BOOL.getSchemaInfo();
+        try {
+            adminWithoutPermission.schemas().createSchema(topicName, si);
+            fail("Should have failed");
+        } catch (Exception ignore) {
+        }
+        admin.topics().grantPermission(topicName, "producer+consumer", EnumSet.of(AuthAction.produce, AuthAction.consume));
+        adminWithAdminPermission.schemas().createSchema(topicName, si);
+        try {
+            adminWithoutPermission.schemas().getSchemaInfo(topicName);
+            fail("Should have failed");
+        } catch (Exception ignore) {
+        }
+        admin.topics().grantPermission(topicName, "consumer", EnumSet.of(AuthAction.consume));
+        SchemaInfo readSi = adminWithProducerConsumePermission.schemas().getSchemaInfo(topicName);
+        assertEquals(readSi, si);
+        try {
+            adminWithoutPermission.schemas().getSchemaInfo(topicName, 0);
+            fail("Should have failed");
+        } catch (Exception ignore) {
+        }
+        readSi = adminWithProducerConsumePermission.schemas().getSchemaInfo(topicName, 0);
+        assertEquals(readSi, si);
+        List<SchemaInfo> allSchemas = adminWithProducerConsumePermission.schemas().getAllSchemas(topicName);
+        assertEquals(allSchemas.size(), 1);
+        SchemaInfo schemaInfo2 = Schema.BOOL.getSchemaInfo();
+        try {
+            adminWithoutPermission.schemas().testCompatibility(topicName, schemaInfo2);
+            fail("Should have failed");
+        } catch (Exception ignore) {
+        }
+        boolean compatibility = adminWithAdminPermission.schemas().testCompatibility(topicName, schemaInfo2).isCompatibility();
+        assertTrue(compatibility);
+        try {
+            adminWithoutPermission.schemas().getVersionBySchema(topicName, si);
+            fail("Should have failed");
+        } catch (Exception ignore) {
+        }
+        Long versionBySchema = adminWithProducerConsumePermission.schemas().getVersionBySchema(topicName, si);
+        assertEquals(versionBySchema, Long.valueOf(0L));
+        try {
+            adminWithoutPermission.schemas().deleteSchema(topicName);
+            fail("Should have failed");
+        } catch (Exception ignore) {
+        }
+        admin.topics().grantPermission(topicName, "producer", EnumSet.of(AuthAction.produce));

Review Comment:
   fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codelipenghui commented on a diff in pull request #15956: [modify][admin] Change the permissions of the schema API from Admin to normal produce/consume

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on code in PR #15956:
URL: https://github.com/apache/pulsar/pull/15956#discussion_r890778406


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SchemasResourceBase.java:
##########
@@ -97,7 +98,7 @@ public void getSchema(boolean authoritative, AsyncResponse response) {
     }
 
     public CompletableFuture<SchemaAndMetadata> getSchemaAsync(boolean authoritative) {
-        return validateDestinationAndAdminOperationAsync(authoritative)
+        return validateOwnershipAndOperationAsync(authoritative, TopicOperation.CONSUME)

Review Comment:
   For both PRODUCE and CONSUME, it can get schema, not only the consumer.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SchemasResourceBase.java:
##########
@@ -163,7 +164,7 @@ public void deleteSchema(boolean authoritative, AsyncResponse response, boolean
     }
 
     public CompletableFuture<SchemaVersion> deleteSchemaAsync(boolean authoritative, boolean force) {
-        return validateDestinationAndAdminOperationAsync(authoritative)
+        return validateOwnershipAndOperationAsync(authoritative, TopicOperation.PRODUCE)

Review Comment:
   Can we only change the get method in this PR? If we allow the PRODUCE can upload and delete schema, which will skip the schema auto-upload configuration of the broker.
   
   Before we introduce update schema, and delete schema operation, we can keep them only available for tenant admin.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java:
##########
@@ -749,9 +747,7 @@ private CompletableFuture<Void> provisionPartitionedTopicPath(AsyncResponse asyn
     }
 
     protected CompletableFuture<SchemaCompatibilityStrategy> getSchemaCompatibilityStrategyAsync() {
-        return validateTopicPolicyOperationAsync(topicName,
-                PolicyName.SCHEMA_COMPATIBILITY_STRATEGY,
-                PolicyOperation.READ)

Review Comment:
   Why we should change here? Users can their own `AuthorizationProvider` which can support `SCHEMA_COMPATIBILITY_STRATEGY` operation, we should fix the `PulsarAuthorizationProvider`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Technoboy- commented on a diff in pull request #15956: [modify][admin] Change the permissions of the schema API from Admin to normal produce/consume

Posted by GitBox <gi...@apache.org>.
Technoboy- commented on code in PR #15956:
URL: https://github.com/apache/pulsar/pull/15956#discussion_r890823610


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java:
##########
@@ -749,9 +747,7 @@ private CompletableFuture<Void> provisionPartitionedTopicPath(AsyncResponse asyn
     }
 
     protected CompletableFuture<SchemaCompatibilityStrategy> getSchemaCompatibilityStrategyAsync() {
-        return validateTopicPolicyOperationAsync(topicName,
-                PolicyName.SCHEMA_COMPATIBILITY_STRATEGY,
-                PolicyOperation.READ)

Review Comment:
   > Why we should change here? Users can their own `AuthorizationProvider` which can support `SCHEMA_COMPATIBILITY_STRATEGY` operation, we should fix the `PulsarAuthorizationProvider`?
   
   Yes, we need to do more in `PulsarAuthorizationProvider`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Technoboy- commented on a diff in pull request #15956: [modify][admin] Change the permissions of the schema API from Admin to normal produce/consume

Posted by GitBox <gi...@apache.org>.
Technoboy- commented on code in PR #15956:
URL: https://github.com/apache/pulsar/pull/15956#discussion_r893131263


##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaWithAuthTest.java:
##########
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.admin;
+
+import com.google.common.collect.Sets;
+import io.jsonwebtoken.Jwts;
+import io.jsonwebtoken.SignatureAlgorithm;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.broker.authentication.AuthenticationProviderToken;
+import org.apache.pulsar.broker.authentication.utils.AuthTokenUtils;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminBuilder;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.auth.AuthenticationToken;
+import org.apache.pulsar.common.policies.data.AuthAction;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.apache.pulsar.common.schema.SchemaInfo;
+import org.mockito.Mockito;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+import javax.crypto.SecretKey;
+import java.util.Base64;
+import java.util.EnumSet;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+
+/**
+ * Unit tests for schema admin api.
+ */
+@Slf4j
+@Test(groups = "broker-admin")
+public class AdminApiSchemaWithAuthTest extends MockedPulsarServiceBaseTest {
+
+    private static final SecretKey SECRET_KEY = AuthTokenUtils.createSecretKey(SignatureAlgorithm.HS256);
+    private static final String ADMIN_TOKEN = Jwts.builder().setSubject("admin").signWith(SECRET_KEY).compact();
+    private static final String PRODUCE_CONSUME_TOKEN = Jwts.builder().setSubject("producer+consumer").signWith(SECRET_KEY).compact();
+
+    @BeforeMethod
+    @Override
+    public void setup() throws Exception {
+        conf.setAuthorizationEnabled(true);
+        conf.setAuthenticationEnabled(true);
+        conf.getProperties().setProperty("tokenSecretKey", "data:;base64,"
+                + Base64.getEncoder().encodeToString(SECRET_KEY.getEncoded()));
+        Set<String> providers = new HashSet<>();
+        providers.add(AuthenticationProviderToken.class.getName());
+        Set<String> superUserRoles = new HashSet<>();
+        superUserRoles.add("admin");
+        conf.setSuperUserRoles(superUserRoles);
+        conf.setAuthenticationProviders(providers);
+        conf.setSystemTopicEnabled(false);
+        conf.setTopicLevelPoliciesEnabled(false);
+        super.internalSetup();
+
+        PulsarAdminBuilder pulsarAdminBuilder = PulsarAdmin.builder().serviceHttpUrl(brokerUrl != null
+                        ? brokerUrl.toString() : brokerUrlTls.toString())
+                .authentication(AuthenticationToken.class.getName(),
+                        ADMIN_TOKEN);
+        admin = Mockito.spy(pulsarAdminBuilder.build());
+
+        // Setup namespaces
+        admin.clusters().createCluster("test", ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()).build());
+        TenantInfoImpl tenantInfo = new TenantInfoImpl(Sets.newHashSet("role1", "role2"), Sets.newHashSet("test"));
+        admin.tenants().createTenant("schematest", tenantInfo);
+        admin.namespaces().createNamespace("schematest/test", Sets.newHashSet("test"));
+    }
+
+    @AfterMethod(alwaysRun = true)
+    @Override
+    public void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    @Test
+    public void testGetCreateDeleteSchema() throws Exception {
+        String topicName = "persistent://schematest/test/testCreateSchema";
+        PulsarAdmin adminWithoutPermission = PulsarAdmin.builder()
+                .serviceHttpUrl(brokerUrl != null ? brokerUrl.toString() : brokerUrlTls.toString())
+                .build();
+        PulsarAdmin adminWithAdminPermission = PulsarAdmin.builder()
+                .serviceHttpUrl(brokerUrl != null ? brokerUrl.toString() : brokerUrlTls.toString())
+                .authentication(AuthenticationToken.class.getName(), ADMIN_TOKEN)
+                .build();
+        PulsarAdmin adminWithProducerConsumePermission = PulsarAdmin.builder()
+                .serviceHttpUrl(brokerUrl != null ? brokerUrl.toString() : brokerUrlTls.toString())
+                .authentication(AuthenticationToken.class.getName(), PRODUCE_CONSUME_TOKEN)
+                .build();
+        SchemaInfo si = Schema.BOOL.getSchemaInfo();
+        try {
+            adminWithoutPermission.schemas().createSchema(topicName, si);
+            fail("Should have failed");
+        } catch (Exception ignore) {
+        }
+        admin.topics().grantPermission(topicName, "producer+consumer", EnumSet.of(AuthAction.produce, AuthAction.consume));
+        adminWithAdminPermission.schemas().createSchema(topicName, si);
+        try {
+            adminWithoutPermission.schemas().getSchemaInfo(topicName);
+            fail("Should have failed");
+        } catch (Exception ignore) {
+        }
+        admin.topics().grantPermission(topicName, "consumer", EnumSet.of(AuthAction.consume));
+        SchemaInfo readSi = adminWithProducerConsumePermission.schemas().getSchemaInfo(topicName);
+        assertEquals(readSi, si);
+        try {
+            adminWithoutPermission.schemas().getSchemaInfo(topicName, 0);

Review Comment:
   fixed



##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaWithAuthTest.java:
##########
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.admin;
+
+import com.google.common.collect.Sets;
+import io.jsonwebtoken.Jwts;
+import io.jsonwebtoken.SignatureAlgorithm;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.broker.authentication.AuthenticationProviderToken;
+import org.apache.pulsar.broker.authentication.utils.AuthTokenUtils;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminBuilder;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.auth.AuthenticationToken;
+import org.apache.pulsar.common.policies.data.AuthAction;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.apache.pulsar.common.schema.SchemaInfo;
+import org.mockito.Mockito;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+import javax.crypto.SecretKey;
+import java.util.Base64;
+import java.util.EnumSet;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+
+/**
+ * Unit tests for schema admin api.
+ */
+@Slf4j
+@Test(groups = "broker-admin")
+public class AdminApiSchemaWithAuthTest extends MockedPulsarServiceBaseTest {
+
+    private static final SecretKey SECRET_KEY = AuthTokenUtils.createSecretKey(SignatureAlgorithm.HS256);
+    private static final String ADMIN_TOKEN = Jwts.builder().setSubject("admin").signWith(SECRET_KEY).compact();
+    private static final String PRODUCE_CONSUME_TOKEN = Jwts.builder().setSubject("producer+consumer").signWith(SECRET_KEY).compact();
+
+    @BeforeMethod
+    @Override
+    public void setup() throws Exception {
+        conf.setAuthorizationEnabled(true);
+        conf.setAuthenticationEnabled(true);
+        conf.getProperties().setProperty("tokenSecretKey", "data:;base64,"
+                + Base64.getEncoder().encodeToString(SECRET_KEY.getEncoded()));
+        Set<String> providers = new HashSet<>();
+        providers.add(AuthenticationProviderToken.class.getName());
+        Set<String> superUserRoles = new HashSet<>();
+        superUserRoles.add("admin");
+        conf.setSuperUserRoles(superUserRoles);
+        conf.setAuthenticationProviders(providers);
+        conf.setSystemTopicEnabled(false);
+        conf.setTopicLevelPoliciesEnabled(false);
+        super.internalSetup();
+
+        PulsarAdminBuilder pulsarAdminBuilder = PulsarAdmin.builder().serviceHttpUrl(brokerUrl != null
+                        ? brokerUrl.toString() : brokerUrlTls.toString())
+                .authentication(AuthenticationToken.class.getName(),
+                        ADMIN_TOKEN);
+        admin = Mockito.spy(pulsarAdminBuilder.build());
+
+        // Setup namespaces
+        admin.clusters().createCluster("test", ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()).build());
+        TenantInfoImpl tenantInfo = new TenantInfoImpl(Sets.newHashSet("role1", "role2"), Sets.newHashSet("test"));
+        admin.tenants().createTenant("schematest", tenantInfo);
+        admin.namespaces().createNamespace("schematest/test", Sets.newHashSet("test"));
+    }
+
+    @AfterMethod(alwaysRun = true)
+    @Override
+    public void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    @Test
+    public void testGetCreateDeleteSchema() throws Exception {
+        String topicName = "persistent://schematest/test/testCreateSchema";
+        PulsarAdmin adminWithoutPermission = PulsarAdmin.builder()
+                .serviceHttpUrl(brokerUrl != null ? brokerUrl.toString() : brokerUrlTls.toString())
+                .build();
+        PulsarAdmin adminWithAdminPermission = PulsarAdmin.builder()
+                .serviceHttpUrl(brokerUrl != null ? brokerUrl.toString() : brokerUrlTls.toString())
+                .authentication(AuthenticationToken.class.getName(), ADMIN_TOKEN)
+                .build();
+        PulsarAdmin adminWithProducerConsumePermission = PulsarAdmin.builder()
+                .serviceHttpUrl(brokerUrl != null ? brokerUrl.toString() : brokerUrlTls.toString())
+                .authentication(AuthenticationToken.class.getName(), PRODUCE_CONSUME_TOKEN)
+                .build();
+        SchemaInfo si = Schema.BOOL.getSchemaInfo();
+        try {
+            adminWithoutPermission.schemas().createSchema(topicName, si);
+            fail("Should have failed");
+        } catch (Exception ignore) {
+        }
+        admin.topics().grantPermission(topicName, "producer+consumer", EnumSet.of(AuthAction.produce, AuthAction.consume));
+        adminWithAdminPermission.schemas().createSchema(topicName, si);
+        try {
+            adminWithoutPermission.schemas().getSchemaInfo(topicName);
+            fail("Should have failed");
+        } catch (Exception ignore) {
+        }
+        admin.topics().grantPermission(topicName, "consumer", EnumSet.of(AuthAction.consume));

Review Comment:
   fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Technoboy- commented on a diff in pull request #15956: [modify][admin] Change the permissions of the schema API from Admin to normal produce/consume

Posted by GitBox <gi...@apache.org>.
Technoboy- commented on code in PR #15956:
URL: https://github.com/apache/pulsar/pull/15956#discussion_r893131085


##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaWithAuthTest.java:
##########
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.admin;
+
+import com.google.common.collect.Sets;
+import io.jsonwebtoken.Jwts;
+import io.jsonwebtoken.SignatureAlgorithm;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.broker.authentication.AuthenticationProviderToken;
+import org.apache.pulsar.broker.authentication.utils.AuthTokenUtils;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminBuilder;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.auth.AuthenticationToken;
+import org.apache.pulsar.common.policies.data.AuthAction;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.apache.pulsar.common.schema.SchemaInfo;
+import org.mockito.Mockito;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+import javax.crypto.SecretKey;
+import java.util.Base64;
+import java.util.EnumSet;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+
+/**
+ * Unit tests for schema admin api.
+ */
+@Slf4j
+@Test(groups = "broker-admin")
+public class AdminApiSchemaWithAuthTest extends MockedPulsarServiceBaseTest {
+
+    private static final SecretKey SECRET_KEY = AuthTokenUtils.createSecretKey(SignatureAlgorithm.HS256);
+    private static final String ADMIN_TOKEN = Jwts.builder().setSubject("admin").signWith(SECRET_KEY).compact();
+    private static final String PRODUCE_CONSUME_TOKEN = Jwts.builder().setSubject("producer+consumer").signWith(SECRET_KEY).compact();
+
+    @BeforeMethod
+    @Override
+    public void setup() throws Exception {
+        conf.setAuthorizationEnabled(true);
+        conf.setAuthenticationEnabled(true);
+        conf.getProperties().setProperty("tokenSecretKey", "data:;base64,"
+                + Base64.getEncoder().encodeToString(SECRET_KEY.getEncoded()));
+        Set<String> providers = new HashSet<>();
+        providers.add(AuthenticationProviderToken.class.getName());
+        Set<String> superUserRoles = new HashSet<>();
+        superUserRoles.add("admin");
+        conf.setSuperUserRoles(superUserRoles);
+        conf.setAuthenticationProviders(providers);
+        conf.setSystemTopicEnabled(false);
+        conf.setTopicLevelPoliciesEnabled(false);
+        super.internalSetup();
+
+        PulsarAdminBuilder pulsarAdminBuilder = PulsarAdmin.builder().serviceHttpUrl(brokerUrl != null
+                        ? brokerUrl.toString() : brokerUrlTls.toString())
+                .authentication(AuthenticationToken.class.getName(),
+                        ADMIN_TOKEN);
+        admin = Mockito.spy(pulsarAdminBuilder.build());
+
+        // Setup namespaces
+        admin.clusters().createCluster("test", ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()).build());
+        TenantInfoImpl tenantInfo = new TenantInfoImpl(Sets.newHashSet("role1", "role2"), Sets.newHashSet("test"));
+        admin.tenants().createTenant("schematest", tenantInfo);
+        admin.namespaces().createNamespace("schematest/test", Sets.newHashSet("test"));
+    }
+
+    @AfterMethod(alwaysRun = true)
+    @Override
+    public void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    @Test
+    public void testGetCreateDeleteSchema() throws Exception {
+        String topicName = "persistent://schematest/test/testCreateSchema";
+        PulsarAdmin adminWithoutPermission = PulsarAdmin.builder()
+                .serviceHttpUrl(brokerUrl != null ? brokerUrl.toString() : brokerUrlTls.toString())
+                .build();
+        PulsarAdmin adminWithAdminPermission = PulsarAdmin.builder()
+                .serviceHttpUrl(brokerUrl != null ? brokerUrl.toString() : brokerUrlTls.toString())
+                .authentication(AuthenticationToken.class.getName(), ADMIN_TOKEN)
+                .build();
+        PulsarAdmin adminWithProducerConsumePermission = PulsarAdmin.builder()
+                .serviceHttpUrl(brokerUrl != null ? brokerUrl.toString() : brokerUrlTls.toString())
+                .authentication(AuthenticationToken.class.getName(), PRODUCE_CONSUME_TOKEN)
+                .build();
+        SchemaInfo si = Schema.BOOL.getSchemaInfo();
+        try {
+            adminWithoutPermission.schemas().createSchema(topicName, si);

Review Comment:
   Learned. Thanks very much. fixed



##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaWithAuthTest.java:
##########
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.admin;
+
+import com.google.common.collect.Sets;
+import io.jsonwebtoken.Jwts;
+import io.jsonwebtoken.SignatureAlgorithm;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.broker.authentication.AuthenticationProviderToken;
+import org.apache.pulsar.broker.authentication.utils.AuthTokenUtils;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminBuilder;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.auth.AuthenticationToken;
+import org.apache.pulsar.common.policies.data.AuthAction;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.apache.pulsar.common.schema.SchemaInfo;
+import org.mockito.Mockito;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+import javax.crypto.SecretKey;
+import java.util.Base64;
+import java.util.EnumSet;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+
+/**
+ * Unit tests for schema admin api.
+ */
+@Slf4j
+@Test(groups = "broker-admin")
+public class AdminApiSchemaWithAuthTest extends MockedPulsarServiceBaseTest {
+
+    private static final SecretKey SECRET_KEY = AuthTokenUtils.createSecretKey(SignatureAlgorithm.HS256);
+    private static final String ADMIN_TOKEN = Jwts.builder().setSubject("admin").signWith(SECRET_KEY).compact();
+    private static final String PRODUCE_CONSUME_TOKEN = Jwts.builder().setSubject("producer+consumer").signWith(SECRET_KEY).compact();
+
+    @BeforeMethod
+    @Override
+    public void setup() throws Exception {
+        conf.setAuthorizationEnabled(true);
+        conf.setAuthenticationEnabled(true);
+        conf.getProperties().setProperty("tokenSecretKey", "data:;base64,"
+                + Base64.getEncoder().encodeToString(SECRET_KEY.getEncoded()));
+        Set<String> providers = new HashSet<>();
+        providers.add(AuthenticationProviderToken.class.getName());
+        Set<String> superUserRoles = new HashSet<>();
+        superUserRoles.add("admin");
+        conf.setSuperUserRoles(superUserRoles);
+        conf.setAuthenticationProviders(providers);
+        conf.setSystemTopicEnabled(false);
+        conf.setTopicLevelPoliciesEnabled(false);
+        super.internalSetup();
+
+        PulsarAdminBuilder pulsarAdminBuilder = PulsarAdmin.builder().serviceHttpUrl(brokerUrl != null
+                        ? brokerUrl.toString() : brokerUrlTls.toString())
+                .authentication(AuthenticationToken.class.getName(),
+                        ADMIN_TOKEN);
+        admin = Mockito.spy(pulsarAdminBuilder.build());
+
+        // Setup namespaces
+        admin.clusters().createCluster("test", ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()).build());
+        TenantInfoImpl tenantInfo = new TenantInfoImpl(Sets.newHashSet("role1", "role2"), Sets.newHashSet("test"));
+        admin.tenants().createTenant("schematest", tenantInfo);
+        admin.namespaces().createNamespace("schematest/test", Sets.newHashSet("test"));
+    }
+
+    @AfterMethod(alwaysRun = true)
+    @Override
+    public void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    @Test
+    public void testGetCreateDeleteSchema() throws Exception {
+        String topicName = "persistent://schematest/test/testCreateSchema";
+        PulsarAdmin adminWithoutPermission = PulsarAdmin.builder()
+                .serviceHttpUrl(brokerUrl != null ? brokerUrl.toString() : brokerUrlTls.toString())
+                .build();
+        PulsarAdmin adminWithAdminPermission = PulsarAdmin.builder()
+                .serviceHttpUrl(brokerUrl != null ? brokerUrl.toString() : brokerUrlTls.toString())
+                .authentication(AuthenticationToken.class.getName(), ADMIN_TOKEN)
+                .build();
+        PulsarAdmin adminWithProducerConsumePermission = PulsarAdmin.builder()
+                .serviceHttpUrl(brokerUrl != null ? brokerUrl.toString() : brokerUrlTls.toString())
+                .authentication(AuthenticationToken.class.getName(), PRODUCE_CONSUME_TOKEN)
+                .build();
+        SchemaInfo si = Schema.BOOL.getSchemaInfo();
+        try {
+            adminWithoutPermission.schemas().createSchema(topicName, si);
+            fail("Should have failed");
+        } catch (Exception ignore) {
+        }
+        admin.topics().grantPermission(topicName, "producer+consumer", EnumSet.of(AuthAction.produce, AuthAction.consume));
+        adminWithAdminPermission.schemas().createSchema(topicName, si);
+        try {
+            adminWithoutPermission.schemas().getSchemaInfo(topicName);

Review Comment:
   fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Technoboy- commented on a diff in pull request #15956: [modify][admin] Change the permissions of the schema API from Admin to normal produce/consume

Posted by GitBox <gi...@apache.org>.
Technoboy- commented on code in PR #15956:
URL: https://github.com/apache/pulsar/pull/15956#discussion_r893132076


##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaWithAuthTest.java:
##########
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.admin;
+
+import com.google.common.collect.Sets;
+import io.jsonwebtoken.Jwts;
+import io.jsonwebtoken.SignatureAlgorithm;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.broker.authentication.AuthenticationProviderToken;
+import org.apache.pulsar.broker.authentication.utils.AuthTokenUtils;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminBuilder;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.auth.AuthenticationToken;
+import org.apache.pulsar.common.policies.data.AuthAction;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.apache.pulsar.common.schema.SchemaInfo;
+import org.mockito.Mockito;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+import javax.crypto.SecretKey;
+import java.util.Base64;
+import java.util.EnumSet;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+
+/**
+ * Unit tests for schema admin api.
+ */
+@Slf4j
+@Test(groups = "broker-admin")
+public class AdminApiSchemaWithAuthTest extends MockedPulsarServiceBaseTest {
+
+    private static final SecretKey SECRET_KEY = AuthTokenUtils.createSecretKey(SignatureAlgorithm.HS256);
+    private static final String ADMIN_TOKEN = Jwts.builder().setSubject("admin").signWith(SECRET_KEY).compact();
+    private static final String PRODUCE_CONSUME_TOKEN = Jwts.builder().setSubject("producer+consumer").signWith(SECRET_KEY).compact();
+
+    @BeforeMethod
+    @Override
+    public void setup() throws Exception {
+        conf.setAuthorizationEnabled(true);
+        conf.setAuthenticationEnabled(true);
+        conf.getProperties().setProperty("tokenSecretKey", "data:;base64,"
+                + Base64.getEncoder().encodeToString(SECRET_KEY.getEncoded()));
+        Set<String> providers = new HashSet<>();
+        providers.add(AuthenticationProviderToken.class.getName());
+        Set<String> superUserRoles = new HashSet<>();
+        superUserRoles.add("admin");
+        conf.setSuperUserRoles(superUserRoles);
+        conf.setAuthenticationProviders(providers);
+        conf.setSystemTopicEnabled(false);
+        conf.setTopicLevelPoliciesEnabled(false);
+        super.internalSetup();
+
+        PulsarAdminBuilder pulsarAdminBuilder = PulsarAdmin.builder().serviceHttpUrl(brokerUrl != null
+                        ? brokerUrl.toString() : brokerUrlTls.toString())
+                .authentication(AuthenticationToken.class.getName(),
+                        ADMIN_TOKEN);
+        admin = Mockito.spy(pulsarAdminBuilder.build());
+
+        // Setup namespaces
+        admin.clusters().createCluster("test", ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()).build());
+        TenantInfoImpl tenantInfo = new TenantInfoImpl(Sets.newHashSet("role1", "role2"), Sets.newHashSet("test"));
+        admin.tenants().createTenant("schematest", tenantInfo);
+        admin.namespaces().createNamespace("schematest/test", Sets.newHashSet("test"));
+    }
+
+    @AfterMethod(alwaysRun = true)
+    @Override
+    public void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    @Test
+    public void testGetCreateDeleteSchema() throws Exception {
+        String topicName = "persistent://schematest/test/testCreateSchema";
+        PulsarAdmin adminWithoutPermission = PulsarAdmin.builder()
+                .serviceHttpUrl(brokerUrl != null ? brokerUrl.toString() : brokerUrlTls.toString())
+                .build();
+        PulsarAdmin adminWithAdminPermission = PulsarAdmin.builder()
+                .serviceHttpUrl(brokerUrl != null ? brokerUrl.toString() : brokerUrlTls.toString())
+                .authentication(AuthenticationToken.class.getName(), ADMIN_TOKEN)
+                .build();
+        PulsarAdmin adminWithProducerConsumePermission = PulsarAdmin.builder()
+                .serviceHttpUrl(brokerUrl != null ? brokerUrl.toString() : brokerUrlTls.toString())
+                .authentication(AuthenticationToken.class.getName(), PRODUCE_CONSUME_TOKEN)
+                .build();
+        SchemaInfo si = Schema.BOOL.getSchemaInfo();
+        try {
+            adminWithoutPermission.schemas().createSchema(topicName, si);
+            fail("Should have failed");
+        } catch (Exception ignore) {
+        }
+        admin.topics().grantPermission(topicName, "producer+consumer", EnumSet.of(AuthAction.produce, AuthAction.consume));
+        adminWithAdminPermission.schemas().createSchema(topicName, si);
+        try {
+            adminWithoutPermission.schemas().getSchemaInfo(topicName);
+            fail("Should have failed");
+        } catch (Exception ignore) {
+        }
+        admin.topics().grantPermission(topicName, "consumer", EnumSet.of(AuthAction.consume));
+        SchemaInfo readSi = adminWithProducerConsumePermission.schemas().getSchemaInfo(topicName);
+        assertEquals(readSi, si);
+        try {
+            adminWithoutPermission.schemas().getSchemaInfo(topicName, 0);
+            fail("Should have failed");
+        } catch (Exception ignore) {
+        }
+        readSi = adminWithProducerConsumePermission.schemas().getSchemaInfo(topicName, 0);
+        assertEquals(readSi, si);
+        List<SchemaInfo> allSchemas = adminWithProducerConsumePermission.schemas().getAllSchemas(topicName);
+        assertEquals(allSchemas.size(), 1);
+        SchemaInfo schemaInfo2 = Schema.BOOL.getSchemaInfo();
+        try {
+            adminWithoutPermission.schemas().testCompatibility(topicName, schemaInfo2);
+            fail("Should have failed");
+        } catch (Exception ignore) {
+        }
+        boolean compatibility = adminWithAdminPermission.schemas().testCompatibility(topicName, schemaInfo2).isCompatibility();
+        assertTrue(compatibility);
+        try {
+            adminWithoutPermission.schemas().getVersionBySchema(topicName, si);

Review Comment:
   fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Technoboy- commented on a diff in pull request #15956: [modify][admin] Change the permissions of the schema API from Admin to normal produce/consume

Posted by GitBox <gi...@apache.org>.
Technoboy- commented on code in PR #15956:
URL: https://github.com/apache/pulsar/pull/15956#discussion_r891895121


##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaWithAuthTest.java:
##########
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.admin;
+
+import com.google.common.collect.Sets;
+import io.jsonwebtoken.Jwts;
+import io.jsonwebtoken.SignatureAlgorithm;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.broker.authentication.AuthenticationProviderToken;
+import org.apache.pulsar.broker.authentication.utils.AuthTokenUtils;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminBuilder;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.auth.AuthenticationToken;
+import org.apache.pulsar.common.policies.data.AuthAction;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.apache.pulsar.common.schema.SchemaInfo;
+import org.mockito.Mockito;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+import javax.crypto.SecretKey;
+import java.util.Base64;
+import java.util.EnumSet;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+
+/**
+ * Unit tests for schema admin api.
+ */
+@Slf4j
+@Test(groups = "broker-admin")
+public class AdminApiSchemaWithAuthTest extends MockedPulsarServiceBaseTest {
+
+    private static final SecretKey SECRET_KEY = AuthTokenUtils.createSecretKey(SignatureAlgorithm.HS256);
+    private static final String ADMIN_TOKEN = Jwts.builder().setSubject("admin").signWith(SECRET_KEY).compact();
+    private static final String PRODUCE_CONSUME_TOKEN = Jwts.builder().setSubject("producer+consumer").signWith(SECRET_KEY).compact();
+
+    @BeforeMethod
+    @Override
+    public void setup() throws Exception {
+        conf.setAuthorizationEnabled(true);
+        conf.setAuthenticationEnabled(true);
+        conf.getProperties().setProperty("tokenSecretKey", "data:;base64,"
+                + Base64.getEncoder().encodeToString(SECRET_KEY.getEncoded()));
+        Set<String> providers = new HashSet<>();
+        providers.add(AuthenticationProviderToken.class.getName());
+        Set<String> superUserRoles = new HashSet<>();
+        superUserRoles.add("admin");
+        conf.setSuperUserRoles(superUserRoles);
+        conf.setAuthenticationProviders(providers);
+        conf.setSystemTopicEnabled(false);
+        conf.setTopicLevelPoliciesEnabled(false);
+        super.internalSetup();
+
+        PulsarAdminBuilder pulsarAdminBuilder = PulsarAdmin.builder().serviceHttpUrl(brokerUrl != null
+                        ? brokerUrl.toString() : brokerUrlTls.toString())
+                .authentication(AuthenticationToken.class.getName(),
+                        ADMIN_TOKEN);
+        admin = Mockito.spy(pulsarAdminBuilder.build());
+
+        // Setup namespaces
+        admin.clusters().createCluster("test", ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()).build());
+        TenantInfoImpl tenantInfo = new TenantInfoImpl(Sets.newHashSet("role1", "role2"), Sets.newHashSet("test"));
+        admin.tenants().createTenant("schematest", tenantInfo);
+        admin.namespaces().createNamespace("schematest/test", Sets.newHashSet("test"));
+    }
+
+    @AfterMethod(alwaysRun = true)
+    @Override
+    public void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    @Test
+    public void testGetCreateDeleteSchema() throws Exception {

Review Comment:
   Ah, because there are tests to cover no auth case, so this test only enables auth to test the right permission.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Technoboy- commented on pull request #15956: [modify][admin] Change the permissions of the schema API from Admin to normal produce/consume

Posted by GitBox <gi...@apache.org>.
Technoboy- commented on PR #15956:
URL: https://github.com/apache/pulsar/pull/15956#issuecomment-1149432259

   > @nodece It seems that the `PRODUCE` and `CONSUME` should contain the `GET_SCHEMA` permission, or else `PRODUCE` and `CONSUME` permission couldn't work well, or users need to grant `PRODUCE` and `GET_SCHEMA` permissions at the same time, then they could produce messages.
   
   yes, right. So use `GET_METADATA` instead. Because the user has `produce` or `consume` permission can also get the schema...


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org