You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by yu...@apache.org on 2022/08/22 16:07:46 UTC
[pulsar] branch master updated: [Authorization] Fix producer/consume permission can’t get v1/schema (#16018)
This is an automated email from the ASF dual-hosted git repository.
yuruguo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 95c1778e3c2 [Authorization] Fix producer/consume permission can’t get v1/schema (#16018)
95c1778e3c2 is described below
commit 95c1778e3c2a6254b81a7c8c95315473a713121f
Author: Ruguo Yu <ji...@163.com>
AuthorDate: Tue Aug 23 00:07:35 2022 +0800
[Authorization] Fix producer/consume permission can’t get v1/schema (#16018)
* [Authorization] Fix producer/consume permission can’t get v1/schema
---
.../broker/admin/impl/SchemasResourceBase.java | 207 ---------------------
.../pulsar/broker/admin/v1/SchemasResource.java | 96 +++++++++-
2 files changed, 89 insertions(+), 214 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SchemasResourceBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SchemasResourceBase.java
index 175ab5ac27c..a115b26407d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SchemasResourceBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SchemasResourceBase.java
@@ -29,32 +29,23 @@ import java.time.Clock;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
-import javax.ws.rs.container.AsyncResponse;
-import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.service.schema.SchemaRegistry.SchemaAndMetadata;
import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
-import org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException;
-import org.apache.pulsar.broker.service.schema.exceptions.InvalidSchemaDataException;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.client.internal.DefaultImplementation;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
import org.apache.pulsar.common.policies.data.TopicOperation;
-import org.apache.pulsar.common.protocol.schema.DeleteSchemaResponse;
import org.apache.pulsar.common.protocol.schema.GetAllVersionsSchemaResponse;
import org.apache.pulsar.common.protocol.schema.GetSchemaResponse;
-import org.apache.pulsar.common.protocol.schema.IsCompatibilityResponse;
-import org.apache.pulsar.common.protocol.schema.LongSchemaVersionResponse;
import org.apache.pulsar.common.protocol.schema.PostSchemaPayload;
-import org.apache.pulsar.common.protocol.schema.PostSchemaResponse;
import org.apache.pulsar.common.protocol.schema.SchemaData;
import org.apache.pulsar.common.protocol.schema.SchemaVersion;
import org.apache.pulsar.common.schema.LongSchemaVersion;
import org.apache.pulsar.common.schema.SchemaType;
-import org.apache.pulsar.common.util.FutureUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -88,33 +79,12 @@ public class SchemasResourceBase extends AdminResource {
}
}
- public void getSchema(boolean authoritative, AsyncResponse response) {
- validateDestinationAndAdminOperation(authoritative);
- String schemaId = getSchemaId();
- pulsar().getSchemaRegistryService().getSchema(schemaId).handle((schema, error) -> {
- handleGetSchemaResponse(response, schema, error);
- return null;
- });
- }
-
public CompletableFuture<SchemaAndMetadata> getSchemaAsync(boolean authoritative) {
return validateOwnershipAndOperationAsync(authoritative, TopicOperation.GET_METADATA)
.thenApply(__ -> getSchemaId())
.thenCompose(schemaId -> pulsar().getSchemaRegistryService().getSchema(schemaId));
}
- public void getSchema(boolean authoritative, String version, AsyncResponse response) {
- validateDestinationAndAdminOperation(authoritative);
- String schemaId = getSchemaId();
- ByteBuffer bbVersion = ByteBuffer.allocate(Long.BYTES);
- bbVersion.putLong(Long.parseLong(version));
- SchemaVersion v = pulsar().getSchemaRegistryService().versionFromBytes(bbVersion.array());
- pulsar().getSchemaRegistryService().getSchema(schemaId, v).handle((schema, error) -> {
- handleGetSchemaResponse(response, schema, error);
- return null;
- });
- }
-
public CompletableFuture<SchemaAndMetadata> getSchemaAsync(boolean authoritative, String version) {
return validateOwnershipAndOperationAsync(authoritative, TopicOperation.GET_METADATA)
.thenApply(__ -> getSchemaId())
@@ -127,16 +97,6 @@ public class SchemasResourceBase extends AdminResource {
});
}
- public void getAllSchemas(boolean authoritative, AsyncResponse response) {
- validateDestinationAndAdminOperation(authoritative);
-
- String schemaId = getSchemaId();
- pulsar().getSchemaRegistryService().trimDeletedSchemaAndGetList(schemaId).handle((schema, error) -> {
- handleGetAllSchemasResponse(response, schema, error);
- return null;
- });
- }
-
public CompletableFuture<List<SchemaAndMetadata>> getAllSchemasAsync(boolean authoritative) {
return validateOwnershipAndOperationAsync(authoritative, TopicOperation.GET_METADATA)
.thenCompose(__ -> {
@@ -145,24 +105,6 @@ public class SchemasResourceBase extends AdminResource {
});
}
- public void deleteSchema(boolean authoritative, AsyncResponse response, boolean force) {
- validateDestinationAndAdminOperation(authoritative);
-
- String schemaId = getSchemaId();
- pulsar().getSchemaRegistryService().deleteSchema(schemaId, defaultIfEmpty(clientAppId(), ""), force)
- .handle((version, error) -> {
- if (isNull(error)) {
- response.resume(Response.ok()
- .entity(DeleteSchemaResponse.builder().version(getLongSchemaVersion(version)).build())
- .build());
- } else {
- log.error("[{}] Failed to delete schema for topic {}", clientAppId(), topicName, error);
- response.resume(new RestException(error));
- }
- return null;
- });
- }
-
public CompletableFuture<SchemaVersion> deleteSchemaAsync(boolean authoritative, boolean force) {
return validateDestinationAndAdminOperationAsync(authoritative)
.thenCompose(__ -> {
@@ -172,61 +114,6 @@ public class SchemasResourceBase extends AdminResource {
});
}
- public void postSchema(PostSchemaPayload payload, boolean authoritative, AsyncResponse response) {
- validateDestinationAndAdminOperation(authoritative);
-
- getSchemaCompatibilityStrategyAsync().thenAccept(schemaCompatibilityStrategy -> {
- byte[] data;
- if (SchemaType.KEY_VALUE.name().equals(payload.getType())) {
- try {
- data = DefaultImplementation.getDefaultImplementation()
- .convertKeyValueDataStringToSchemaInfoSchema(payload.getSchema().getBytes(Charsets.UTF_8));
- } catch (IOException conversionError) {
- log.error("[{}] Failed to post schema for topic {}", clientAppId(), topicName, conversionError);
- response.resume(new RestException(conversionError));
- return;
- }
- } else {
- data = payload.getSchema().getBytes(Charsets.UTF_8);
- }
- pulsar().getSchemaRegistryService()
- .putSchemaIfAbsent(getSchemaId(),
- SchemaData.builder().data(data).isDeleted(false).timestamp(clock.millis())
- .type(SchemaType.valueOf(payload.getType())).user(defaultIfEmpty(clientAppId(), ""))
- .props(payload.getProperties()).build(),
- schemaCompatibilityStrategy)
- .thenAccept(version -> response.resume(
- Response.accepted().entity(PostSchemaResponse.builder().version(version).build()).build()))
- .exceptionally(error -> {
- Throwable throwable = FutureUtil.unwrapCompletionException(error);
- if (throwable instanceof IncompatibleSchemaException) {
- response.resume(Response
- .status(Response.Status.CONFLICT.getStatusCode(), throwable.getMessage())
- .build());
- } else if (throwable instanceof InvalidSchemaDataException) {
- response.resume(Response.status(422, /* Unprocessable Entity */
- throwable.getMessage()).build());
- } else {
- log.error("[{}] Failed to post schema for topic {}", clientAppId(), topicName, throwable);
- response.resume(new RestException(throwable));
- }
- return null;
- });
- }).exceptionally(error -> {
- Throwable throwable = FutureUtil.unwrapCompletionException(error);
- if (throwable instanceof RestException) {
- // Unprocessable Entity
- response.resume(Response
- .status(((RestException) throwable).getResponse().getStatus(), throwable.getMessage())
- .build());
- } else {
- log.error("[{}] Failed to post schema for topic {}", clientAppId(), topicName, throwable);
- response.resume(new RestException(throwable));
- }
- return null;
- });
- }
-
public CompletableFuture<SchemaVersion> postSchemaAsync(PostSchemaPayload payload, boolean authoritative) {
return validateDestinationAndAdminOperationAsync(authoritative)
.thenCompose(__ -> getSchemaCompatibilityStrategyAsync())
@@ -254,27 +141,6 @@ public class SchemasResourceBase extends AdminResource {
});
}
- public void testCompatibility(PostSchemaPayload payload, boolean authoritative, AsyncResponse response) {
- validateDestinationAndAdminOperation(authoritative);
-
- String schemaId = getSchemaId();
-
- getSchemaCompatibilityStrategyAsync().thenCompose(schemaCompatibilityStrategy -> pulsar()
- .getSchemaRegistryService().isCompatible(schemaId,
- SchemaData.builder().data(payload.getSchema().getBytes(Charsets.UTF_8)).isDeleted(false)
- .timestamp(clock.millis()).type(SchemaType.valueOf(payload.getType()))
- .user(defaultIfEmpty(clientAppId(), "")).props(payload.getProperties()).build(),
- schemaCompatibilityStrategy)
- .thenAccept(isCompatible -> response.resume(Response.accepted()
- .entity(IsCompatibilityResponse.builder().isCompatibility(isCompatible)
- .schemaCompatibilityStrategy(schemaCompatibilityStrategy.name()).build())
- .build())))
- .exceptionally(error -> {
- response.resume(new RestException(FutureUtil.unwrapCompletionException(error)));
- return null;
- });
- }
-
public CompletableFuture<Pair<Boolean, SchemaCompatibilityStrategy>> testCompatibilityAsync(
PostSchemaPayload payload, boolean authoritative) {
return validateDestinationAndAdminOperationAsync(authoritative)
@@ -292,26 +158,6 @@ public class SchemasResourceBase extends AdminResource {
});
}
- public void getVersionBySchema(PostSchemaPayload payload, boolean authoritative, AsyncResponse response) {
- validateDestinationAndAdminOperation(authoritative);
-
- String schemaId = getSchemaId();
-
- pulsar().getSchemaRegistryService()
- .findSchemaVersion(schemaId,
- SchemaData.builder().data(payload.getSchema().getBytes(Charsets.UTF_8)).isDeleted(false)
- .timestamp(clock.millis()).type(SchemaType.valueOf(payload.getType()))
- .user(defaultIfEmpty(clientAppId(), "")).props(payload.getProperties()).build())
- .thenAccept(version -> response.resume(Response.accepted()
- .entity(LongSchemaVersionResponse.builder().version(version).build()).build()))
- .exceptionally(error -> {
- Throwable throwable = FutureUtil.unwrapCompletionException(error);
- log.error("[{}] Failed to get version by schema for topic {}", clientAppId(), topicName, throwable);
- response.resume(new RestException(throwable));
- return null;
- });
- }
-
public CompletableFuture<Long> getVersionBySchemaAsync(PostSchemaPayload payload, boolean authoritative) {
return validateOwnershipAndOperationAsync(authoritative, TopicOperation.GET_METADATA)
.thenCompose(__ -> {
@@ -349,25 +195,6 @@ public class SchemasResourceBase extends AdminResource {
}
}
- protected static void handleGetSchemaResponse(AsyncResponse response, SchemaAndMetadata schema, Throwable error) {
- if (isNull(error)) {
- if (isNull(schema)) {
- response.resume(Response.status(
- Response.Status.NOT_FOUND.getStatusCode(), "Schema not found").build());
- } else if (schema.schema.isDeleted()) {
- response.resume(Response.status(
- Response.Status.NOT_FOUND.getStatusCode(), "Schema is deleted").build());
- } else {
- response.resume(Response.ok().encoding(MediaType.APPLICATION_JSON)
- .entity(convertSchemaAndMetadataToGetSchemaResponse(schema)).build());
- }
- } else {
- log.error("Failed to get schema", error);
- response.resume(new RestException(error));
- }
-
- }
-
protected GetSchemaResponse convertToSchemaResponse(SchemaAndMetadata schema) {
if (isNull(schema)) {
throw new RestException(Response.Status.NOT_FOUND.getStatusCode(), "Schema not found");
@@ -389,40 +216,6 @@ public class SchemasResourceBase extends AdminResource {
}
}
- private static void handleGetAllSchemasResponse(AsyncResponse response, List<SchemaAndMetadata> schemas,
- Throwable error) {
- if (isNull(error)) {
- if (isNull(schemas)) {
- response.resume(Response.status(
- Response.Status.NOT_FOUND.getStatusCode(), "Schemas not found").build());
- } else {
- response.resume(Response.ok().encoding(MediaType.APPLICATION_JSON)
- .entity(GetAllVersionsSchemaResponse.builder()
- .getSchemaResponses(schemas.stream()
- .map(SchemasResourceBase::convertSchemaAndMetadataToGetSchemaResponse)
- .collect(Collectors.toList()))
- .build())
- .build());
- }
- } else {
- log.error("Failed to get all schemas", error);
- response.resume(new RestException(error));
- }
- }
-
- private void validateDestinationAndAdminOperation(boolean authoritative) {
- try {
- validateAdminAccessForTenant(topicName.getTenant());
- validateTopicOwnership(topicName, authoritative);
- } catch (RestException e) {
- if (e.getResponse().getStatus() == Response.Status.UNAUTHORIZED.getStatusCode()) {
- throw new RestException(Response.Status.UNAUTHORIZED, e.getMessage());
- } else {
- throw e;
- }
- }
- }
-
private CompletableFuture<Void> validateDestinationAndAdminOperationAsync(boolean authoritative) {
return validateTopicOwnershipAsync(topicName, authoritative)
.thenCompose(__ -> validateAdminAccessForTenantAsync(topicName.getTenant()));
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/SchemasResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/SchemasResource.java
index c6e4239a3aa..13bfba35121 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/SchemasResource.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/SchemasResource.java
@@ -38,14 +38,20 @@ import javax.ws.rs.QueryParam;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.container.Suspended;
import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.admin.impl.SchemasResourceBase;
+import org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException;
+import org.apache.pulsar.broker.service.schema.exceptions.InvalidSchemaDataException;
import org.apache.pulsar.common.protocol.schema.DeleteSchemaResponse;
import org.apache.pulsar.common.protocol.schema.GetAllVersionsSchemaResponse;
import org.apache.pulsar.common.protocol.schema.GetSchemaResponse;
import org.apache.pulsar.common.protocol.schema.IsCompatibilityResponse;
+import org.apache.pulsar.common.protocol.schema.LongSchemaVersionResponse;
import org.apache.pulsar.common.protocol.schema.PostSchemaPayload;
import org.apache.pulsar.common.protocol.schema.PostSchemaResponse;
import org.apache.pulsar.common.schema.LongSchemaVersion;
+import org.apache.pulsar.common.util.FutureUtil;
@Path("/schemas")
@Api(
@@ -53,6 +59,7 @@ import org.apache.pulsar.common.schema.LongSchemaVersion;
description = "Schemas related admin APIs",
tags = "schemas"
)
+@Slf4j
public class SchemasResource extends SchemasResourceBase {
public SchemasResource() {
@@ -81,7 +88,16 @@ public class SchemasResource extends SchemasResourceBase {
@Suspended final AsyncResponse response
) {
validateTopicName(tenant, cluster, namespace, topic);
- getSchema(authoritative, response);
+ getSchemaAsync(authoritative)
+ .thenApply(schemaAndMetadata -> convertToSchemaResponse(schemaAndMetadata))
+ .thenApply(response::resume)
+ .exceptionally(ex -> {
+ if (!isRedirectException(ex)) {
+ log.error("[{}] Failed to get schema for topic {}", clientAppId(), topicName, ex);
+ }
+ resumeAsyncResponseExceptionally(response, ex);
+ return null;
+ });
}
@GET
@@ -107,7 +123,17 @@ public class SchemasResource extends SchemasResourceBase {
@Suspended final AsyncResponse response
) {
validateTopicName(tenant, cluster, namespace, topic);
- getSchema(authoritative, version, response);
+ getSchemaAsync(authoritative, version)
+ .thenApply(schemaAndMetadata -> convertToSchemaResponse(schemaAndMetadata))
+ .thenAccept(response::resume)
+ .exceptionally(ex -> {
+ if (!isRedirectException(ex)) {
+ log.error("[{}] Failed to get schema for topic {} with version {}",
+ clientAppId(), topicName, version, ex);
+ }
+ resumeAsyncResponseExceptionally(response, ex);
+ return null;
+ });
}
@GET
@@ -132,7 +158,16 @@ public class SchemasResource extends SchemasResourceBase {
@Suspended final AsyncResponse response
) {
validateTopicName(tenant, cluster, namespace, topic);
- getAllSchemas(authoritative, response);
+ getAllSchemasAsync(authoritative)
+ .thenApply(schemaAndMetadata -> convertToAllVersionsSchemaResponse(schemaAndMetadata))
+ .thenAccept(response::resume)
+ .exceptionally(ex -> {
+ if (!isRedirectException(ex)) {
+ log.error("[{}] Failed to get all schemas for topic {}", clientAppId(), topicName, ex);
+ }
+ resumeAsyncResponseExceptionally(response, ex);
+ return null;
+ });
}
@DELETE
@@ -157,7 +192,17 @@ public class SchemasResource extends SchemasResourceBase {
@Suspended final AsyncResponse response
) {
validateTopicName(tenant, cluster, namespace, topic);
- deleteSchema(authoritative, response, force);
+ deleteSchemaAsync(authoritative, force)
+ .thenAccept(version -> {
+ response.resume(DeleteSchemaResponse.builder().version(getLongSchemaVersion(version)).build());
+ })
+ .exceptionally(ex -> {
+ if (!isRedirectException(ex)) {
+ log.error("[{}] Failed to delete schemas for topic {}", clientAppId(), topicName, ex);
+ }
+ resumeAsyncResponseExceptionally(response, ex);
+ return null;
+ });
}
@POST
@@ -195,7 +240,25 @@ public class SchemasResource extends SchemasResourceBase {
@Suspended final AsyncResponse response
) {
validateTopicName(tenant, cluster, namespace, topic);
- postSchema(payload, authoritative, response);
+ postSchemaAsync(payload, authoritative)
+ .thenAccept(version -> response.resume(PostSchemaResponse.builder().version(version).build()))
+ .exceptionally(ex -> {
+ Throwable root = FutureUtil.unwrapCompletionException(ex);
+ if (root instanceof IncompatibleSchemaException) {
+ response.resume(Response
+ .status(Response.Status.CONFLICT.getStatusCode(), root.getMessage())
+ .build());
+ } else if (root instanceof InvalidSchemaDataException) {
+ response.resume(Response.status(422, /* Unprocessable Entity */
+ root.getMessage()).build());
+ } else {
+ if (!isRedirectException(ex)) {
+ log.error("[{}] Failed to post schemas for topic {}", clientAppId(), topicName, root);
+ }
+ resumeAsyncResponseExceptionally(response, ex);
+ }
+ return null;
+ });
}
@POST
@@ -232,7 +295,18 @@ public class SchemasResource extends SchemasResourceBase {
@Suspended final AsyncResponse response
) {
validateTopicName(tenant, cluster, namespace, topic);
- testCompatibility(payload, authoritative, response);
+ testCompatibilityAsync(payload, authoritative)
+ .thenAccept(pair -> response.resume(Response.accepted()
+ .entity(IsCompatibilityResponse.builder().isCompatibility(pair.getLeft())
+ .schemaCompatibilityStrategy(pair.getRight().name()).build())
+ .build()))
+ .exceptionally(ex -> {
+ if (!isRedirectException(ex)) {
+ log.error("[{}] Failed to test compatibility for topic {}", clientAppId(), topicName, ex);
+ }
+ resumeAsyncResponseExceptionally(response, ex);
+ return null;
+ });
}
@POST
@@ -270,6 +344,14 @@ public class SchemasResource extends SchemasResourceBase {
@Suspended final AsyncResponse response
) {
validateTopicName(tenant, cluster, namespace, topic);
- getVersionBySchema(payload, authoritative, response);
+ getVersionBySchemaAsync(payload, authoritative)
+ .thenAccept(version -> response.resume(LongSchemaVersionResponse.builder().version(version).build()))
+ .exceptionally(ex -> {
+ if (!isRedirectException(ex)) {
+ log.error("[{}] Failed to get version by schema for topic {}", clientAppId(), topicName, ex);
+ }
+ resumeAsyncResponseExceptionally(response, ex);
+ return null;
+ });
}
}