You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/06/13 03:42:45 UTC
[pulsar] branch branch-2.10 updated: [fix][admin] Fix producer/consume permission can’t get schema (#15956) (#16026)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.10 by this push:
new f3b4e8632b6 [fix][admin] Fix producer/consume permission can’t get schema (#15956) (#16026)
f3b4e8632b6 is described below
commit f3b4e8632b6a69e5a175fec472310bf520d98051
Author: Jiwei Guo <te...@apache.org>
AuthorDate: Mon Jun 13 11:42:38 2022 +0800
[fix][admin] Fix producer/consume permission can’t get schema (#15956) (#16026)
Cherry-pick #15956.
### Motivation
Currently, we need admin permissions to operate the schema API. This is because the admin permission was defined when the schema API was first added. See #1381.
Later, then adding authentication granularity with #6428, we don't change the schema API part. So leave the admin permission today.
But the binary protocol allows the produce/consume permission to get the schema, so change the related method permission to `produce/consume`.
---
.../broker/admin/impl/SchemasResourceBase.java | 22 +++-
.../pulsar/broker/admin/v2/SchemasResource.java | 6 +-
.../broker/admin/AdminApiSchemaWithAuthTest.java | 139 +++++++++++++++++++++
3 files changed, 160 insertions(+), 7 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SchemasResourceBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SchemasResourceBase.java
index 5b119ec881d..304b311cbea 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SchemasResourceBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SchemasResourceBase.java
@@ -38,6 +38,7 @@ import org.apache.pulsar.broker.service.schema.exceptions.InvalidSchemaDataExcep
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.client.internal.DefaultImplementation;
import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.TopicOperation;
import org.apache.pulsar.common.protocol.schema.DeleteSchemaResponse;
import org.apache.pulsar.common.protocol.schema.GetAllVersionsSchemaResponse;
import org.apache.pulsar.common.protocol.schema.GetSchemaResponse;
@@ -83,7 +84,7 @@ public class SchemasResourceBase extends AdminResource {
}
public void getSchema(boolean authoritative, AsyncResponse response) {
- validateDestinationAndAdminOperation(authoritative);
+ validateOwnershipAndOperation(authoritative, TopicOperation.GET_METADATA);
String schemaId = getSchemaId();
pulsar().getSchemaRegistryService().getSchema(schemaId).handle((schema, error) -> {
handleGetSchemaResponse(response, schema, error);
@@ -92,7 +93,7 @@ public class SchemasResourceBase extends AdminResource {
}
public void getSchema(boolean authoritative, String version, AsyncResponse response) {
- validateDestinationAndAdminOperation(authoritative);
+ validateOwnershipAndOperation(authoritative, TopicOperation.GET_METADATA);
String schemaId = getSchemaId();
ByteBuffer bbVersion = ByteBuffer.allocate(Long.BYTES);
bbVersion.putLong(Long.parseLong(version));
@@ -104,7 +105,7 @@ public class SchemasResourceBase extends AdminResource {
}
public void getAllSchemas(boolean authoritative, AsyncResponse response) {
- validateDestinationAndAdminOperation(authoritative);
+ validateOwnershipAndOperation(authoritative, TopicOperation.GET_METADATA);
String schemaId = getSchemaId();
pulsar().getSchemaRegistryService().trimDeletedSchemaAndGetList(schemaId).handle((schema, error) -> {
@@ -208,7 +209,7 @@ public class SchemasResourceBase extends AdminResource {
public void getVersionBySchema(
PostSchemaPayload payload, boolean authoritative, AsyncResponse response) {
- validateDestinationAndAdminOperation(authoritative);
+ validateOwnershipAndOperation(authoritative, TopicOperation.GET_METADATA);
String schemaId = getSchemaId();
@@ -302,5 +303,18 @@ public class SchemasResourceBase extends AdminResource {
}
}
+ private void validateOwnershipAndOperation(boolean authoritative, TopicOperation operation) {
+ try {
+ validateTopicOwnership(topicName, authoritative);
+ validateTopicOperation(topicName, operation);
+ } catch (RestException e) {
+ if (e.getResponse().getStatus() == Response.Status.UNAUTHORIZED.getStatusCode()) {
+ throw new RestException(Response.Status.UNAUTHORIZED, e.getMessage());
+ } else {
+ throw e;
+ }
+ }
+ }
+
private static final Logger log = LoggerFactory.getLogger(SchemasResourceBase.class);
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/SchemasResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/SchemasResource.java
index 9e6f7e9f80a..fc30149ccd7 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/SchemasResource.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/SchemasResource.java
@@ -177,7 +177,7 @@ public class SchemasResource extends SchemasResourceBase {
@PathParam("namespace") String namespace,
@PathParam("topic") String topic,
@ApiParam(
- value = "A JSON value presenting a schema playload. An example of the expected schema can be found down"
+ value = "A JSON value presenting a schema payload. An example of the expected schema can be found down"
+ " here.",
examples = @Example(
value = @ExampleProperty(
@@ -212,7 +212,7 @@ public class SchemasResource extends SchemasResourceBase {
@PathParam("namespace") String namespace,
@PathParam("topic") String topic,
@ApiParam(
- value = "A JSON value presenting a schema playload."
+ value = "A JSON value presenting a schema payload."
+ " An example of the expected schema can be found down here.",
examples = @Example(
value = @ExampleProperty(
@@ -249,7 +249,7 @@ public class SchemasResource extends SchemasResourceBase {
@PathParam("namespace") String namespace,
@PathParam("topic") String topic,
@ApiParam(
- value = "A JSON value presenting a schema playload."
+ value = "A JSON value presenting a schema payload."
+ " An example of the expected schema can be found down here.",
examples = @Example(
value = @ExampleProperty(
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaWithAuthTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaWithAuthTest.java
new file mode 100644
index 00000000000..29c0f97e610
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaWithAuthTest.java
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.admin;
+
+import com.google.common.collect.Sets;
+import io.jsonwebtoken.Jwts;
+import io.jsonwebtoken.SignatureAlgorithm;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.broker.authentication.AuthenticationProviderToken;
+import org.apache.pulsar.broker.authentication.utils.AuthTokenUtils;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminBuilder;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.auth.AuthenticationToken;
+import org.apache.pulsar.common.policies.data.AuthAction;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.apache.pulsar.common.schema.SchemaInfo;
+import org.mockito.Mockito;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+import javax.crypto.SecretKey;
+import java.util.Base64;
+import java.util.EnumSet;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertThrows;
+import static org.testng.Assert.assertTrue;
+/**
+ * Unit tests for schema admin api.
+ */
+@Slf4j
+@Test(groups = "broker-admin")
+public class AdminApiSchemaWithAuthTest extends MockedPulsarServiceBaseTest {
+
+ private static final SecretKey SECRET_KEY = AuthTokenUtils.createSecretKey(SignatureAlgorithm.HS256);
+ private static final String ADMIN_TOKEN = Jwts.builder().setSubject("admin").signWith(SECRET_KEY).compact();
+ private static final String CONSUME_TOKEN = Jwts.builder().setSubject("consumer").signWith(SECRET_KEY).compact();
+
+ @BeforeMethod
+ @Override
+ public void setup() throws Exception {
+ conf.setAuthorizationEnabled(true);
+ conf.setAuthenticationEnabled(true);
+ conf.getProperties().setProperty("tokenSecretKey", "data:;base64,"
+ + Base64.getEncoder().encodeToString(SECRET_KEY.getEncoded()));
+ Set<String> providers = new HashSet<>();
+ providers.add(AuthenticationProviderToken.class.getName());
+ Set<String> superUserRoles = new HashSet<>();
+ superUserRoles.add("admin");
+ conf.setSuperUserRoles(superUserRoles);
+ conf.setAuthenticationProviders(providers);
+ conf.setSystemTopicEnabled(false);
+ conf.setTopicLevelPoliciesEnabled(false);
+ super.internalSetup();
+
+ PulsarAdminBuilder pulsarAdminBuilder = PulsarAdmin.builder().serviceHttpUrl(brokerUrl != null
+ ? brokerUrl.toString() : brokerUrlTls.toString())
+ .authentication(AuthenticationToken.class.getName(),
+ ADMIN_TOKEN);
+ admin = Mockito.spy(pulsarAdminBuilder.build());
+
+ // Setup namespaces
+ admin.clusters().createCluster("test", ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()).build());
+ TenantInfoImpl tenantInfo = new TenantInfoImpl(Sets.newHashSet("role1", "role2"), Sets.newHashSet("test"));
+ admin.tenants().createTenant("schematest", tenantInfo);
+ admin.namespaces().createNamespace("schematest/test", Sets.newHashSet("test"));
+ }
+
+ @AfterMethod(alwaysRun = true)
+ @Override
+ public void cleanup() throws Exception {
+ super.internalCleanup();
+ }
+
+ @Test
+ public void testGetCreateDeleteSchema() throws Exception {
+ String topicName = "persistent://schematest/test/testCreateSchema";
+ PulsarAdmin adminWithoutPermission = PulsarAdmin.builder()
+ .serviceHttpUrl(brokerUrl != null ? brokerUrl.toString() : brokerUrlTls.toString())
+ .build();
+ PulsarAdmin adminWithAdminPermission = PulsarAdmin.builder()
+ .serviceHttpUrl(brokerUrl != null ? brokerUrl.toString() : brokerUrlTls.toString())
+ .authentication(AuthenticationToken.class.getName(), ADMIN_TOKEN)
+ .build();
+ PulsarAdmin adminWithConsumePermission = PulsarAdmin.builder()
+ .serviceHttpUrl(brokerUrl != null ? brokerUrl.toString() : brokerUrlTls.toString())
+ .authentication(AuthenticationToken.class.getName(), CONSUME_TOKEN)
+ .build();
+ admin.topics().grantPermission(topicName, "consumer", EnumSet.of(AuthAction.consume));
+ admin.topics().grantPermission(topicName, "producer", EnumSet.of(AuthAction.produce));
+
+ SchemaInfo si = Schema.BOOL.getSchemaInfo();
+ assertThrows(PulsarAdminException.class, () -> adminWithoutPermission.schemas().createSchema(topicName, si));
+ adminWithAdminPermission.schemas().createSchema(topicName, si);
+
+ assertThrows(PulsarAdminException.class, () -> adminWithoutPermission.schemas().getSchemaInfo(topicName));
+ SchemaInfo readSi = adminWithConsumePermission.schemas().getSchemaInfo(topicName);
+ assertEquals(readSi, si);
+
+ assertThrows(PulsarAdminException.class, () -> adminWithoutPermission.schemas().getSchemaInfo(topicName, 0));
+ readSi = adminWithConsumePermission.schemas().getSchemaInfo(topicName, 0);
+ assertEquals(readSi, si);
+ List<SchemaInfo> allSchemas = adminWithConsumePermission.schemas().getAllSchemas(topicName);
+ assertEquals(allSchemas.size(), 1);
+
+ SchemaInfo schemaInfo2 = Schema.BOOL.getSchemaInfo();
+ assertThrows(PulsarAdminException.class, () -> adminWithoutPermission.schemas().testCompatibility(topicName, schemaInfo2));
+ assertTrue(adminWithAdminPermission.schemas().testCompatibility(topicName, schemaInfo2).isCompatibility());
+
+ assertThrows(PulsarAdminException.class, () -> adminWithoutPermission.schemas().getVersionBySchema(topicName, si));
+ Long versionBySchema = adminWithConsumePermission.schemas().getVersionBySchema(topicName, si);
+ assertEquals(versionBySchema, Long.valueOf(0L));
+
+ assertThrows(PulsarAdminException.class, () -> adminWithoutPermission.schemas().deleteSchema(topicName));
+ adminWithAdminPermission.schemas().deleteSchema(topicName);
+ }
+}