You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2022/10/19 07:54:23 UTC
[pulsar] branch master updated: [improve][schema] Change update schema auth from tenant to produce (#18074)
This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 26b47ffbcdc [improve][schema] Change update schema auth from tenant to produce (#18074)
26b47ffbcdc is described below
commit 26b47ffbcdc7f91425ed1ff1cc6cd4d7644a2451
Author: congbo <39...@users.noreply.github.com>
AuthorDate: Wed Oct 19 15:54:10 2022 +0800
[improve][schema] Change update schema auth from tenant to produce (#18074)
---
.../apache/pulsar/broker/admin/AdminResource.java | 54 +++++++++++-----------
.../broker/admin/impl/SchemasResourceBase.java | 4 +-
.../broker/admin/AdminApiSchemaWithAuthTest.java | 9 ++++
3 files changed, 38 insertions(+), 29 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
index 645c804af2f..e2c44a80d4a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
@@ -742,33 +742,7 @@ public abstract class AdminResource extends PulsarWebResource {
return validateTopicPolicyOperationAsync(topicName,
PolicyName.SCHEMA_COMPATIBILITY_STRATEGY,
PolicyOperation.READ)
- .thenCompose((__) -> {
- CompletableFuture<SchemaCompatibilityStrategy> future;
- if (config().isTopicLevelPoliciesEnabled()) {
- future = getTopicPoliciesAsyncWithRetry(topicName)
- .thenApply(op -> op.map(TopicPolicies::getSchemaCompatibilityStrategy).orElse(null));
- } else {
- future = CompletableFuture.completedFuture(null);
- }
-
- return future.thenCompose((topicSchemaCompatibilityStrategy) -> {
- if (!SchemaCompatibilityStrategy.isUndefined(topicSchemaCompatibilityStrategy)) {
- return CompletableFuture.completedFuture(topicSchemaCompatibilityStrategy);
- }
- return getNamespacePoliciesAsync(namespaceName).thenApply(policies -> {
- SchemaCompatibilityStrategy schemaCompatibilityStrategy =
- policies.schema_compatibility_strategy;
- if (SchemaCompatibilityStrategy.isUndefined(schemaCompatibilityStrategy)) {
- schemaCompatibilityStrategy = SchemaCompatibilityStrategy.fromAutoUpdatePolicy(
- policies.schema_auto_update_compatibility_strategy);
- if (SchemaCompatibilityStrategy.isUndefined(schemaCompatibilityStrategy)) {
- schemaCompatibilityStrategy = pulsar().getConfig().getSchemaCompatibilityStrategy();
- }
- }
- return schemaCompatibilityStrategy;
- });
- });
- }).whenComplete((__, ex) -> {
+ .thenCompose((__) -> getSchemaCompatibilityStrategyAsyncWithoutAuth()).whenComplete((__, ex) -> {
if (ex != null) {
log.error("[{}] Failed to get schema compatibility strategy of topic {} {}",
clientAppId(), topicName, ex);
@@ -776,6 +750,32 @@ public abstract class AdminResource extends PulsarWebResource {
});
}
+ protected CompletableFuture<SchemaCompatibilityStrategy> getSchemaCompatibilityStrategyAsyncWithoutAuth() {
+ CompletableFuture<SchemaCompatibilityStrategy> future = CompletableFuture.completedFuture(null);
+ if (config().isTopicLevelPoliciesEnabled()) {
+ future = getTopicPoliciesAsyncWithRetry(topicName)
+ .thenApply(op -> op.map(TopicPolicies::getSchemaCompatibilityStrategy).orElse(null));
+ }
+
+ return future.thenCompose((topicSchemaCompatibilityStrategy) -> {
+ if (!SchemaCompatibilityStrategy.isUndefined(topicSchemaCompatibilityStrategy)) {
+ return CompletableFuture.completedFuture(topicSchemaCompatibilityStrategy);
+ }
+ return getNamespacePoliciesAsync(namespaceName).thenApply(policies -> {
+ SchemaCompatibilityStrategy schemaCompatibilityStrategy =
+ policies.schema_compatibility_strategy;
+ if (SchemaCompatibilityStrategy.isUndefined(schemaCompatibilityStrategy)) {
+ schemaCompatibilityStrategy = SchemaCompatibilityStrategy.fromAutoUpdatePolicy(
+ policies.schema_auto_update_compatibility_strategy);
+ if (SchemaCompatibilityStrategy.isUndefined(schemaCompatibilityStrategy)) {
+ schemaCompatibilityStrategy = pulsar().getConfig().getSchemaCompatibilityStrategy();
+ }
+ }
+ return schemaCompatibilityStrategy;
+ });
+ });
+ }
+
@CanIgnoreReturnValue
public static <T> T checkNotNull(T reference) {
return Objects.requireNonNull(reference);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SchemasResourceBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SchemasResourceBase.java
index 0254ff395ba..76af5825143 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SchemasResourceBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SchemasResourceBase.java
@@ -114,8 +114,8 @@ public class SchemasResourceBase extends AdminResource {
}
public CompletableFuture<SchemaVersion> postSchemaAsync(PostSchemaPayload payload, boolean authoritative) {
- return validateDestinationAndAdminOperationAsync(authoritative)
- .thenCompose(__ -> getSchemaCompatibilityStrategyAsync())
+ return validateOwnershipAndOperationAsync(authoritative, TopicOperation.PRODUCE)
+ .thenCompose(__ -> getSchemaCompatibilityStrategyAsyncWithoutAuth())
.thenCompose(schemaCompatibilityStrategy -> {
byte[] data;
if (SchemaType.KEY_VALUE.name().equals(payload.getType())) {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaWithAuthTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaWithAuthTest.java
index 46830b05204..4de4d905e49 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaWithAuthTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaWithAuthTest.java
@@ -58,6 +58,8 @@ public class AdminApiSchemaWithAuthTest extends MockedPulsarServiceBaseTest {
private static final String ADMIN_TOKEN = Jwts.builder().setSubject("admin").signWith(SECRET_KEY).compact();
private static final String CONSUME_TOKEN = Jwts.builder().setSubject("consumer").signWith(SECRET_KEY).compact();
+ private static final String PRODUCE_TOKEN = Jwts.builder().setSubject("producer").signWith(SECRET_KEY).compact();
+
@BeforeMethod
@Override
public void setup() throws Exception {
@@ -108,11 +110,18 @@ public class AdminApiSchemaWithAuthTest extends MockedPulsarServiceBaseTest {
.serviceHttpUrl(brokerUrl != null ? brokerUrl.toString() : brokerUrlTls.toString())
.authentication(AuthenticationToken.class.getName(), CONSUME_TOKEN)
.build();
+
+ PulsarAdmin adminWithProducePermission = PulsarAdmin.builder()
+ .serviceHttpUrl(brokerUrl != null ? brokerUrl.toString() : brokerUrlTls.toString())
+ .authentication(AuthenticationToken.class.getName(), PRODUCE_TOKEN)
+ .build();
admin.topics().grantPermission(topicName, "consumer", EnumSet.of(AuthAction.consume));
admin.topics().grantPermission(topicName, "producer", EnumSet.of(AuthAction.produce));
SchemaInfo si = Schema.BOOL.getSchemaInfo();
+ assertThrows(PulsarAdminException.class, () -> adminWithConsumePermission.schemas().getSchemaInfo(topicName));
assertThrows(PulsarAdminException.class, () -> adminWithoutPermission.schemas().createSchema(topicName, si));
+ adminWithProducePermission.schemas().createSchema(topicName, si);
adminWithAdminPermission.schemas().createSchema(topicName, si);
assertThrows(PulsarAdminException.class, () -> adminWithoutPermission.schemas().getSchemaInfo(topicName));