You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2022/10/19 07:54:23 UTC

[pulsar] branch master updated: [improve][schema] Change update schema auth from tenant to produce (#18074)

This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 26b47ffbcdc [improve][schema] Change update schema auth from tenant to produce (#18074)
26b47ffbcdc is described below

commit 26b47ffbcdc7f91425ed1ff1cc6cd4d7644a2451
Author: congbo <39...@users.noreply.github.com>
AuthorDate: Wed Oct 19 15:54:10 2022 +0800

    [improve][schema] Change update schema auth from tenant to produce (#18074)
---
 .../apache/pulsar/broker/admin/AdminResource.java  | 54 +++++++++++-----------
 .../broker/admin/impl/SchemasResourceBase.java     |  4 +-
 .../broker/admin/AdminApiSchemaWithAuthTest.java   |  9 ++++
 3 files changed, 38 insertions(+), 29 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
index 645c804af2f..e2c44a80d4a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
@@ -742,33 +742,7 @@ public abstract class AdminResource extends PulsarWebResource {
         return validateTopicPolicyOperationAsync(topicName,
                 PolicyName.SCHEMA_COMPATIBILITY_STRATEGY,
                 PolicyOperation.READ)
-                .thenCompose((__) -> {
-                    CompletableFuture<SchemaCompatibilityStrategy> future;
-                    if (config().isTopicLevelPoliciesEnabled()) {
-                        future = getTopicPoliciesAsyncWithRetry(topicName)
-                                .thenApply(op -> op.map(TopicPolicies::getSchemaCompatibilityStrategy).orElse(null));
-                    } else {
-                        future = CompletableFuture.completedFuture(null);
-                    }
-
-                    return future.thenCompose((topicSchemaCompatibilityStrategy) -> {
-                        if (!SchemaCompatibilityStrategy.isUndefined(topicSchemaCompatibilityStrategy)) {
-                            return CompletableFuture.completedFuture(topicSchemaCompatibilityStrategy);
-                        }
-                        return getNamespacePoliciesAsync(namespaceName).thenApply(policies -> {
-                            SchemaCompatibilityStrategy schemaCompatibilityStrategy =
-                                    policies.schema_compatibility_strategy;
-                            if (SchemaCompatibilityStrategy.isUndefined(schemaCompatibilityStrategy)) {
-                                schemaCompatibilityStrategy = SchemaCompatibilityStrategy.fromAutoUpdatePolicy(
-                                        policies.schema_auto_update_compatibility_strategy);
-                                if (SchemaCompatibilityStrategy.isUndefined(schemaCompatibilityStrategy)) {
-                                    schemaCompatibilityStrategy = pulsar().getConfig().getSchemaCompatibilityStrategy();
-                                }
-                            }
-                            return schemaCompatibilityStrategy;
-                        });
-                    });
-                }).whenComplete((__, ex) -> {
+                .thenCompose((__) -> getSchemaCompatibilityStrategyAsyncWithoutAuth()).whenComplete((__, ex) -> {
                     if (ex != null) {
                         log.error("[{}] Failed to get schema compatibility strategy of topic {} {}",
                                 clientAppId(), topicName, ex);
@@ -776,6 +750,32 @@ public abstract class AdminResource extends PulsarWebResource {
                 });
     }
 
+    protected CompletableFuture<SchemaCompatibilityStrategy> getSchemaCompatibilityStrategyAsyncWithoutAuth() {
+        CompletableFuture<SchemaCompatibilityStrategy> future = CompletableFuture.completedFuture(null);
+        if (config().isTopicLevelPoliciesEnabled()) {
+            future = getTopicPoliciesAsyncWithRetry(topicName)
+                    .thenApply(op -> op.map(TopicPolicies::getSchemaCompatibilityStrategy).orElse(null));
+        }
+
+        return future.thenCompose((topicSchemaCompatibilityStrategy) -> {
+            if (!SchemaCompatibilityStrategy.isUndefined(topicSchemaCompatibilityStrategy)) {
+                return CompletableFuture.completedFuture(topicSchemaCompatibilityStrategy);
+            }
+            return getNamespacePoliciesAsync(namespaceName).thenApply(policies -> {
+                SchemaCompatibilityStrategy schemaCompatibilityStrategy =
+                        policies.schema_compatibility_strategy;
+                if (SchemaCompatibilityStrategy.isUndefined(schemaCompatibilityStrategy)) {
+                    schemaCompatibilityStrategy = SchemaCompatibilityStrategy.fromAutoUpdatePolicy(
+                            policies.schema_auto_update_compatibility_strategy);
+                    if (SchemaCompatibilityStrategy.isUndefined(schemaCompatibilityStrategy)) {
+                        schemaCompatibilityStrategy = pulsar().getConfig().getSchemaCompatibilityStrategy();
+                    }
+                }
+                return schemaCompatibilityStrategy;
+            });
+        });
+    }
+
     @CanIgnoreReturnValue
     public static <T> T checkNotNull(T reference) {
         return Objects.requireNonNull(reference);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SchemasResourceBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SchemasResourceBase.java
index 0254ff395ba..76af5825143 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SchemasResourceBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SchemasResourceBase.java
@@ -114,8 +114,8 @@ public class SchemasResourceBase extends AdminResource {
     }
 
     public CompletableFuture<SchemaVersion> postSchemaAsync(PostSchemaPayload payload, boolean authoritative) {
-        return validateDestinationAndAdminOperationAsync(authoritative)
-                .thenCompose(__ -> getSchemaCompatibilityStrategyAsync())
+        return validateOwnershipAndOperationAsync(authoritative, TopicOperation.PRODUCE)
+                .thenCompose(__ -> getSchemaCompatibilityStrategyAsyncWithoutAuth())
                 .thenCompose(schemaCompatibilityStrategy -> {
                     byte[] data;
                     if (SchemaType.KEY_VALUE.name().equals(payload.getType())) {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaWithAuthTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaWithAuthTest.java
index 46830b05204..4de4d905e49 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaWithAuthTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaWithAuthTest.java
@@ -58,6 +58,8 @@ public class AdminApiSchemaWithAuthTest extends MockedPulsarServiceBaseTest {
     private static final String ADMIN_TOKEN = Jwts.builder().setSubject("admin").signWith(SECRET_KEY).compact();
     private static final String CONSUME_TOKEN = Jwts.builder().setSubject("consumer").signWith(SECRET_KEY).compact();
 
+    private static final String PRODUCE_TOKEN = Jwts.builder().setSubject("producer").signWith(SECRET_KEY).compact();
+
     @BeforeMethod
     @Override
     public void setup() throws Exception {
@@ -108,11 +110,18 @@ public class AdminApiSchemaWithAuthTest extends MockedPulsarServiceBaseTest {
                 .serviceHttpUrl(brokerUrl != null ? brokerUrl.toString() : brokerUrlTls.toString())
                 .authentication(AuthenticationToken.class.getName(), CONSUME_TOKEN)
                 .build();
+
+        PulsarAdmin adminWithProducePermission = PulsarAdmin.builder()
+                .serviceHttpUrl(brokerUrl != null ? brokerUrl.toString() : brokerUrlTls.toString())
+                .authentication(AuthenticationToken.class.getName(), PRODUCE_TOKEN)
+                .build();
         admin.topics().grantPermission(topicName, "consumer", EnumSet.of(AuthAction.consume));
         admin.topics().grantPermission(topicName, "producer", EnumSet.of(AuthAction.produce));
 
         SchemaInfo si = Schema.BOOL.getSchemaInfo();
+        assertThrows(PulsarAdminException.class, () -> adminWithConsumePermission.schemas().getSchemaInfo(topicName));
         assertThrows(PulsarAdminException.class, () -> adminWithoutPermission.schemas().createSchema(topicName, si));
+        adminWithProducePermission.schemas().createSchema(topicName, si);
         adminWithAdminPermission.schemas().createSchema(topicName, si);
 
         assertThrows(PulsarAdminException.class, () -> adminWithoutPermission.schemas().getSchemaInfo(topicName));