You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2022/09/20 08:25:18 UTC
[pulsar] branch branch-2.9 updated: [branch-2.9][cherry-pick] Fix producer/consume permission can’t get schema. (#17730)
This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.9 by this push:
new ce2c15782b2 [branch-2.9][cherry-pick] Fix producer/consume permission can’t get schema. (#17730)
ce2c15782b2 is described below
commit ce2c15782b21ae82aacc1be987d037bce9d276f0
Author: Jiwei Guo <te...@apache.org>
AuthorDate: Tue Sep 20 16:25:10 2022 +0800
[branch-2.9][cherry-pick] Fix producer/consume permission can’t get schema. (#17730)
---
.../broker/admin/impl/SchemasResourceBase.java | 22 +++-
.../broker/admin/AdminApiSchemaWithAuthTest.java | 145 +++++++++++++++++++++
2 files changed, 163 insertions(+), 4 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SchemasResourceBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SchemasResourceBase.java
index 5b119ec881d..304b311cbea 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SchemasResourceBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SchemasResourceBase.java
@@ -38,6 +38,7 @@ import org.apache.pulsar.broker.service.schema.exceptions.InvalidSchemaDataExcep
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.client.internal.DefaultImplementation;
import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.TopicOperation;
import org.apache.pulsar.common.protocol.schema.DeleteSchemaResponse;
import org.apache.pulsar.common.protocol.schema.GetAllVersionsSchemaResponse;
import org.apache.pulsar.common.protocol.schema.GetSchemaResponse;
@@ -83,7 +84,7 @@ public class SchemasResourceBase extends AdminResource {
}
public void getSchema(boolean authoritative, AsyncResponse response) {
- validateDestinationAndAdminOperation(authoritative);
+ validateOwnershipAndOperation(authoritative, TopicOperation.GET_METADATA);
String schemaId = getSchemaId();
pulsar().getSchemaRegistryService().getSchema(schemaId).handle((schema, error) -> {
handleGetSchemaResponse(response, schema, error);
@@ -92,7 +93,7 @@ public class SchemasResourceBase extends AdminResource {
}
public void getSchema(boolean authoritative, String version, AsyncResponse response) {
- validateDestinationAndAdminOperation(authoritative);
+ validateOwnershipAndOperation(authoritative, TopicOperation.GET_METADATA);
String schemaId = getSchemaId();
ByteBuffer bbVersion = ByteBuffer.allocate(Long.BYTES);
bbVersion.putLong(Long.parseLong(version));
@@ -104,7 +105,7 @@ public class SchemasResourceBase extends AdminResource {
}
public void getAllSchemas(boolean authoritative, AsyncResponse response) {
- validateDestinationAndAdminOperation(authoritative);
+ validateOwnershipAndOperation(authoritative, TopicOperation.GET_METADATA);
String schemaId = getSchemaId();
pulsar().getSchemaRegistryService().trimDeletedSchemaAndGetList(schemaId).handle((schema, error) -> {
@@ -208,7 +209,7 @@ public class SchemasResourceBase extends AdminResource {
public void getVersionBySchema(
PostSchemaPayload payload, boolean authoritative, AsyncResponse response) {
- validateDestinationAndAdminOperation(authoritative);
+ validateOwnershipAndOperation(authoritative, TopicOperation.GET_METADATA);
String schemaId = getSchemaId();
@@ -302,5 +303,18 @@ public class SchemasResourceBase extends AdminResource {
}
}
+ private void validateOwnershipAndOperation(boolean authoritative, TopicOperation operation) {
+ try {
+ validateTopicOwnership(topicName, authoritative);
+ validateTopicOperation(topicName, operation);
+ } catch (RestException e) {
+ if (e.getResponse().getStatus() == Response.Status.UNAUTHORIZED.getStatusCode()) {
+ throw new RestException(Response.Status.UNAUTHORIZED, e.getMessage());
+ } else {
+ throw e;
+ }
+ }
+ }
+
private static final Logger log = LoggerFactory.getLogger(SchemasResourceBase.class);
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaWithAuthTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaWithAuthTest.java
new file mode 100644
index 00000000000..6e8fa4c8027
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaWithAuthTest.java
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.admin;
+
+import static org.testng.Assert.assertThrows;
+import static org.testng.AssertJUnit.assertEquals;
+import static org.testng.AssertJUnit.assertTrue;
+import com.google.common.collect.Sets;
+import io.jsonwebtoken.Jwts;
+import io.jsonwebtoken.SignatureAlgorithm;
+import java.util.Base64;
+import java.util.EnumSet;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import javax.crypto.SecretKey;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.broker.authentication.AuthenticationProviderToken;
+import org.apache.pulsar.broker.authentication.utils.AuthTokenUtils;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminBuilder;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.auth.AuthenticationToken;
+import org.apache.pulsar.client.impl.schema.SchemaInfoImpl;
+import org.apache.pulsar.common.policies.data.AuthAction;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.apache.pulsar.common.schema.SchemaInfo;
+import org.mockito.Mockito;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+/**
+ * Unit tests for schema admin api.
+ */
+@Slf4j
+@Test(groups = "broker-admin")
+public class AdminApiSchemaWithAuthTest extends MockedPulsarServiceBaseTest {
+
+ private static final SecretKey SECRET_KEY = AuthTokenUtils.createSecretKey(SignatureAlgorithm.HS256);
+ private static final String ADMIN_TOKEN = Jwts.builder().setSubject("admin").signWith(SECRET_KEY).compact();
+ private static final String CONSUME_TOKEN = Jwts.builder().setSubject("consumer").signWith(SECRET_KEY).compact();
+
+ @BeforeMethod
+ @Override
+ public void setup() throws Exception {
+ conf.setAuthorizationEnabled(true);
+ conf.setAuthenticationEnabled(true);
+ conf.getProperties().setProperty("tokenSecretKey", "data:;base64,"
+ + Base64.getEncoder().encodeToString(SECRET_KEY.getEncoded()));
+ Set<String> providers = new HashSet<>();
+ providers.add(AuthenticationProviderToken.class.getName());
+ Set<String> superUserRoles = new HashSet<>();
+ superUserRoles.add("admin");
+ conf.setSuperUserRoles(superUserRoles);
+ conf.setAuthenticationProviders(providers);
+ conf.setSystemTopicEnabled(false);
+ conf.setTopicLevelPoliciesEnabled(false);
+ super.internalSetup();
+
+ PulsarAdminBuilder pulsarAdminBuilder = PulsarAdmin.builder().serviceHttpUrl(brokerUrl != null
+ ? brokerUrl.toString() : brokerUrlTls.toString())
+ .authentication(AuthenticationToken.class.getName(),
+ ADMIN_TOKEN);
+ admin = Mockito.spy(pulsarAdminBuilder.build());
+
+ // Setup namespaces
+ admin.clusters().createCluster("test", ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()).build());
+ TenantInfoImpl tenantInfo = new TenantInfoImpl(Sets.newHashSet("role1", "role2"), Sets.newHashSet("test"));
+ admin.tenants().createTenant("schematest", tenantInfo);
+ admin.namespaces().createNamespace("schematest/test", Sets.newHashSet("test"));
+ }
+
+ @AfterMethod(alwaysRun = true)
+ @Override
+ public void cleanup() throws Exception {
+ super.internalCleanup();
+ }
+
+ @Test
+ public void testGetCreateDeleteSchema() throws Exception {
+ String topicName = "persistent://schematest/test/testCreateSchema";
+ PulsarAdmin adminWithoutPermission = PulsarAdmin.builder()
+ .serviceHttpUrl(brokerUrl != null ? brokerUrl.toString() : brokerUrlTls.toString())
+ .build();
+ PulsarAdmin adminWithAdminPermission = PulsarAdmin.builder()
+ .serviceHttpUrl(brokerUrl != null ? brokerUrl.toString() : brokerUrlTls.toString())
+ .authentication(AuthenticationToken.class.getName(), ADMIN_TOKEN)
+ .build();
+ PulsarAdmin adminWithConsumePermission = PulsarAdmin.builder()
+ .serviceHttpUrl(brokerUrl != null ? brokerUrl.toString() : brokerUrlTls.toString())
+ .authentication(AuthenticationToken.class.getName(), CONSUME_TOKEN)
+ .build();
+ admin.topics().grantPermission(topicName, "consumer", EnumSet.of(AuthAction.consume));
+ admin.topics().grantPermission(topicName, "producer", EnumSet.of(AuthAction.produce));
+
+ SchemaInfo si = Schema.BOOL.getSchemaInfo();
+ assertThrows(PulsarAdminException.class, () -> adminWithoutPermission.schemas().createSchema(topicName, si));
+ adminWithAdminPermission.schemas().createSchema(topicName, si);
+
+ assertThrows(PulsarAdminException.class, () -> adminWithoutPermission.schemas().getSchemaInfo(topicName));
+ SchemaInfo readSi = adminWithConsumePermission.schemas().getSchemaInfo(topicName);
+ ((SchemaInfoImpl) readSi).setTimestamp(0);
+ assertEquals(readSi, si);
+
+ assertThrows(PulsarAdminException.class, () -> adminWithoutPermission.schemas().getSchemaInfo(topicName, 0));
+ readSi = adminWithConsumePermission.schemas().getSchemaInfo(topicName, 0);
+ ((SchemaInfoImpl) readSi).setTimestamp(0);
+ assertEquals(readSi, si);
+ List<SchemaInfo> allSchemas = adminWithConsumePermission.schemas().getAllSchemas(topicName);
+ assertEquals(allSchemas.size(), 1);
+
+ SchemaInfo schemaInfo2 = Schema.BOOL.getSchemaInfo();
+ assertThrows(PulsarAdminException.class, () ->
+ adminWithoutPermission.schemas().testCompatibility(topicName, schemaInfo2));
+ assertTrue(adminWithAdminPermission.schemas().testCompatibility(topicName, schemaInfo2).isCompatibility());
+
+ assertThrows(PulsarAdminException.class, () ->
+ adminWithoutPermission.schemas().getVersionBySchema(topicName, si));
+ Long versionBySchema = adminWithConsumePermission.schemas().getVersionBySchema(topicName, si);
+ assertEquals(versionBySchema, Long.valueOf(0L));
+
+ assertThrows(PulsarAdminException.class, () -> adminWithoutPermission.schemas().deleteSchema(topicName));
+ adminWithAdminPermission.schemas().deleteSchema(topicName);
+ }
+}