You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/06/13 03:42:45 UTC

[pulsar] branch branch-2.10 updated: [fix][admin] Fix producer/consume permission can’t get schema (#15956) (#16026)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.10 by this push:
     new f3b4e8632b6 [fix][admin] Fix producer/consume permission can’t get schema (#15956) (#16026)
f3b4e8632b6 is described below

commit f3b4e8632b6a69e5a175fec472310bf520d98051
Author: Jiwei Guo <te...@apache.org>
AuthorDate: Mon Jun 13 11:42:38 2022 +0800

    [fix][admin] Fix producer/consume permission can’t get schema (#15956) (#16026)
    
    Cherry-pick #15956.
    ### Motivation
    Currently, we need admin permissions to operate the schema API. This is because the admin permission was defined when the schema API was first added. See #1381.
    Later, then adding authentication granularity with #6428, we don't change the schema API part.  So leave the admin permission today.
    
    But the binary protocol allows the produce/consume permission to get the schema, so change the related method permission to `produce/consume`.
---
 .../broker/admin/impl/SchemasResourceBase.java     |  22 +++-
 .../pulsar/broker/admin/v2/SchemasResource.java    |   6 +-
 .../broker/admin/AdminApiSchemaWithAuthTest.java   | 139 +++++++++++++++++++++
 3 files changed, 160 insertions(+), 7 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SchemasResourceBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SchemasResourceBase.java
index 5b119ec881d..304b311cbea 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SchemasResourceBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SchemasResourceBase.java
@@ -38,6 +38,7 @@ import org.apache.pulsar.broker.service.schema.exceptions.InvalidSchemaDataExcep
 import org.apache.pulsar.broker.web.RestException;
 import org.apache.pulsar.client.internal.DefaultImplementation;
 import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.TopicOperation;
 import org.apache.pulsar.common.protocol.schema.DeleteSchemaResponse;
 import org.apache.pulsar.common.protocol.schema.GetAllVersionsSchemaResponse;
 import org.apache.pulsar.common.protocol.schema.GetSchemaResponse;
@@ -83,7 +84,7 @@ public class SchemasResourceBase extends AdminResource {
     }
 
     public void getSchema(boolean authoritative, AsyncResponse response) {
-        validateDestinationAndAdminOperation(authoritative);
+        validateOwnershipAndOperation(authoritative, TopicOperation.GET_METADATA);
         String schemaId = getSchemaId();
         pulsar().getSchemaRegistryService().getSchema(schemaId).handle((schema, error) -> {
             handleGetSchemaResponse(response, schema, error);
@@ -92,7 +93,7 @@ public class SchemasResourceBase extends AdminResource {
     }
 
     public void getSchema(boolean authoritative, String version, AsyncResponse response) {
-        validateDestinationAndAdminOperation(authoritative);
+        validateOwnershipAndOperation(authoritative, TopicOperation.GET_METADATA);
         String schemaId = getSchemaId();
         ByteBuffer bbVersion = ByteBuffer.allocate(Long.BYTES);
         bbVersion.putLong(Long.parseLong(version));
@@ -104,7 +105,7 @@ public class SchemasResourceBase extends AdminResource {
     }
 
     public void getAllSchemas(boolean authoritative, AsyncResponse response) {
-        validateDestinationAndAdminOperation(authoritative);
+        validateOwnershipAndOperation(authoritative, TopicOperation.GET_METADATA);
 
         String schemaId = getSchemaId();
         pulsar().getSchemaRegistryService().trimDeletedSchemaAndGetList(schemaId).handle((schema, error) -> {
@@ -208,7 +209,7 @@ public class SchemasResourceBase extends AdminResource {
     public void getVersionBySchema(
 
             PostSchemaPayload payload, boolean authoritative, AsyncResponse response) {
-        validateDestinationAndAdminOperation(authoritative);
+        validateOwnershipAndOperation(authoritative, TopicOperation.GET_METADATA);
 
         String schemaId = getSchemaId();
 
@@ -302,5 +303,18 @@ public class SchemasResourceBase extends AdminResource {
         }
     }
 
+    private void validateOwnershipAndOperation(boolean authoritative, TopicOperation operation) {
+        try {
+            validateTopicOwnership(topicName, authoritative);
+            validateTopicOperation(topicName, operation);
+        } catch (RestException e) {
+            if (e.getResponse().getStatus() == Response.Status.UNAUTHORIZED.getStatusCode()) {
+                throw new RestException(Response.Status.UNAUTHORIZED, e.getMessage());
+            } else {
+                throw e;
+            }
+        }
+    }
+
     private static final Logger log = LoggerFactory.getLogger(SchemasResourceBase.class);
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/SchemasResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/SchemasResource.java
index 9e6f7e9f80a..fc30149ccd7 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/SchemasResource.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/SchemasResource.java
@@ -177,7 +177,7 @@ public class SchemasResource extends SchemasResourceBase {
         @PathParam("namespace") String namespace,
         @PathParam("topic") String topic,
         @ApiParam(
-            value = "A JSON value presenting a schema playload. An example of the expected schema can be found down"
+            value = "A JSON value presenting a schema payload. An example of the expected schema can be found down"
                 + " here.",
             examples = @Example(
                 value = @ExampleProperty(
@@ -212,7 +212,7 @@ public class SchemasResource extends SchemasResourceBase {
             @PathParam("namespace") String namespace,
             @PathParam("topic") String topic,
             @ApiParam(
-                    value = "A JSON value presenting a schema playload."
+                    value = "A JSON value presenting a schema payload."
                             + " An example of the expected schema can be found down here.",
                     examples = @Example(
                             value = @ExampleProperty(
@@ -249,7 +249,7 @@ public class SchemasResource extends SchemasResourceBase {
             @PathParam("namespace") String namespace,
             @PathParam("topic") String topic,
             @ApiParam(
-                    value = "A JSON value presenting a schema playload."
+                    value = "A JSON value presenting a schema payload."
                             + " An example of the expected schema can be found down here.",
                     examples = @Example(
                             value = @ExampleProperty(
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaWithAuthTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaWithAuthTest.java
new file mode 100644
index 00000000000..29c0f97e610
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaWithAuthTest.java
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.admin;
+
+import com.google.common.collect.Sets;
+import io.jsonwebtoken.Jwts;
+import io.jsonwebtoken.SignatureAlgorithm;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.broker.authentication.AuthenticationProviderToken;
+import org.apache.pulsar.broker.authentication.utils.AuthTokenUtils;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminBuilder;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.auth.AuthenticationToken;
+import org.apache.pulsar.common.policies.data.AuthAction;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.apache.pulsar.common.schema.SchemaInfo;
+import org.mockito.Mockito;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+import javax.crypto.SecretKey;
+import java.util.Base64;
+import java.util.EnumSet;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertThrows;
+import static org.testng.Assert.assertTrue;
+/**
+ * Unit tests for schema admin api.
+ */
+@Slf4j
+@Test(groups = "broker-admin")
+public class AdminApiSchemaWithAuthTest extends MockedPulsarServiceBaseTest {
+
+    private static final SecretKey SECRET_KEY = AuthTokenUtils.createSecretKey(SignatureAlgorithm.HS256);
+    private static final String ADMIN_TOKEN = Jwts.builder().setSubject("admin").signWith(SECRET_KEY).compact();
+    private static final String CONSUME_TOKEN = Jwts.builder().setSubject("consumer").signWith(SECRET_KEY).compact();
+
+    @BeforeMethod
+    @Override
+    public void setup() throws Exception {
+        conf.setAuthorizationEnabled(true);
+        conf.setAuthenticationEnabled(true);
+        conf.getProperties().setProperty("tokenSecretKey", "data:;base64,"
+                + Base64.getEncoder().encodeToString(SECRET_KEY.getEncoded()));
+        Set<String> providers = new HashSet<>();
+        providers.add(AuthenticationProviderToken.class.getName());
+        Set<String> superUserRoles = new HashSet<>();
+        superUserRoles.add("admin");
+        conf.setSuperUserRoles(superUserRoles);
+        conf.setAuthenticationProviders(providers);
+        conf.setSystemTopicEnabled(false);
+        conf.setTopicLevelPoliciesEnabled(false);
+        super.internalSetup();
+
+        PulsarAdminBuilder pulsarAdminBuilder = PulsarAdmin.builder().serviceHttpUrl(brokerUrl != null
+                        ? brokerUrl.toString() : brokerUrlTls.toString())
+                .authentication(AuthenticationToken.class.getName(),
+                        ADMIN_TOKEN);
+        admin = Mockito.spy(pulsarAdminBuilder.build());
+
+        // Setup namespaces
+        admin.clusters().createCluster("test", ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()).build());
+        TenantInfoImpl tenantInfo = new TenantInfoImpl(Sets.newHashSet("role1", "role2"), Sets.newHashSet("test"));
+        admin.tenants().createTenant("schematest", tenantInfo);
+        admin.namespaces().createNamespace("schematest/test", Sets.newHashSet("test"));
+    }
+
+    @AfterMethod(alwaysRun = true)
+    @Override
+    public void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    @Test
+    public void testGetCreateDeleteSchema() throws Exception {
+        String topicName = "persistent://schematest/test/testCreateSchema";
+        PulsarAdmin adminWithoutPermission = PulsarAdmin.builder()
+                .serviceHttpUrl(brokerUrl != null ? brokerUrl.toString() : brokerUrlTls.toString())
+                .build();
+        PulsarAdmin adminWithAdminPermission = PulsarAdmin.builder()
+                .serviceHttpUrl(brokerUrl != null ? brokerUrl.toString() : brokerUrlTls.toString())
+                .authentication(AuthenticationToken.class.getName(), ADMIN_TOKEN)
+                .build();
+        PulsarAdmin adminWithConsumePermission = PulsarAdmin.builder()
+                .serviceHttpUrl(brokerUrl != null ? brokerUrl.toString() : brokerUrlTls.toString())
+                .authentication(AuthenticationToken.class.getName(), CONSUME_TOKEN)
+                .build();
+        admin.topics().grantPermission(topicName, "consumer", EnumSet.of(AuthAction.consume));
+        admin.topics().grantPermission(topicName, "producer", EnumSet.of(AuthAction.produce));
+
+        SchemaInfo si = Schema.BOOL.getSchemaInfo();
+        assertThrows(PulsarAdminException.class, () -> adminWithoutPermission.schemas().createSchema(topicName, si));
+        adminWithAdminPermission.schemas().createSchema(topicName, si);
+
+        assertThrows(PulsarAdminException.class, () -> adminWithoutPermission.schemas().getSchemaInfo(topicName));
+        SchemaInfo readSi = adminWithConsumePermission.schemas().getSchemaInfo(topicName);
+        assertEquals(readSi, si);
+
+        assertThrows(PulsarAdminException.class, () -> adminWithoutPermission.schemas().getSchemaInfo(topicName, 0));
+        readSi = adminWithConsumePermission.schemas().getSchemaInfo(topicName, 0);
+        assertEquals(readSi, si);
+        List<SchemaInfo> allSchemas = adminWithConsumePermission.schemas().getAllSchemas(topicName);
+        assertEquals(allSchemas.size(), 1);
+
+        SchemaInfo schemaInfo2 = Schema.BOOL.getSchemaInfo();
+        assertThrows(PulsarAdminException.class, () -> adminWithoutPermission.schemas().testCompatibility(topicName, schemaInfo2));
+        assertTrue(adminWithAdminPermission.schemas().testCompatibility(topicName, schemaInfo2).isCompatibility());
+
+        assertThrows(PulsarAdminException.class, () -> adminWithoutPermission.schemas().getVersionBySchema(topicName, si));
+        Long versionBySchema = adminWithConsumePermission.schemas().getVersionBySchema(topicName, si);
+        assertEquals(versionBySchema, Long.valueOf(0L));
+
+        assertThrows(PulsarAdminException.class, () -> adminWithoutPermission.schemas().deleteSchema(topicName));
+        adminWithAdminPermission.schemas().deleteSchema(topicName);
+    }
+}