You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by yu...@apache.org on 2022/08/22 16:07:46 UTC

[pulsar] branch master updated: [Authorization] Fix producer/consume permission can’t get v1/schema (#16018)

This is an automated email from the ASF dual-hosted git repository.

yuruguo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 95c1778e3c2 [Authorization] Fix producer/consume permission can’t get v1/schema (#16018)
95c1778e3c2 is described below

commit 95c1778e3c2a6254b81a7c8c95315473a713121f
Author: Ruguo Yu <ji...@163.com>
AuthorDate: Tue Aug 23 00:07:35 2022 +0800

    [Authorization] Fix producer/consume permission can’t get v1/schema (#16018)
    
    * [Authorization] Fix producer/consume permission can’t get v1/schema
---
 .../broker/admin/impl/SchemasResourceBase.java     | 207 ---------------------
 .../pulsar/broker/admin/v1/SchemasResource.java    |  96 +++++++++-
 2 files changed, 89 insertions(+), 214 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SchemasResourceBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SchemasResourceBase.java
index 175ab5ac27c..a115b26407d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SchemasResourceBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SchemasResourceBase.java
@@ -29,32 +29,23 @@ import java.time.Clock;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.stream.Collectors;
-import javax.ws.rs.container.AsyncResponse;
-import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.broker.admin.AdminResource;
 import org.apache.pulsar.broker.service.schema.SchemaRegistry.SchemaAndMetadata;
 import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
-import org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException;
-import org.apache.pulsar.broker.service.schema.exceptions.InvalidSchemaDataException;
 import org.apache.pulsar.broker.web.RestException;
 import org.apache.pulsar.client.internal.DefaultImplementation;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
 import org.apache.pulsar.common.policies.data.TopicOperation;
-import org.apache.pulsar.common.protocol.schema.DeleteSchemaResponse;
 import org.apache.pulsar.common.protocol.schema.GetAllVersionsSchemaResponse;
 import org.apache.pulsar.common.protocol.schema.GetSchemaResponse;
-import org.apache.pulsar.common.protocol.schema.IsCompatibilityResponse;
-import org.apache.pulsar.common.protocol.schema.LongSchemaVersionResponse;
 import org.apache.pulsar.common.protocol.schema.PostSchemaPayload;
-import org.apache.pulsar.common.protocol.schema.PostSchemaResponse;
 import org.apache.pulsar.common.protocol.schema.SchemaData;
 import org.apache.pulsar.common.protocol.schema.SchemaVersion;
 import org.apache.pulsar.common.schema.LongSchemaVersion;
 import org.apache.pulsar.common.schema.SchemaType;
-import org.apache.pulsar.common.util.FutureUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -88,33 +79,12 @@ public class SchemasResourceBase extends AdminResource {
         }
     }
 
-    public void getSchema(boolean authoritative, AsyncResponse response) {
-        validateDestinationAndAdminOperation(authoritative);
-        String schemaId = getSchemaId();
-        pulsar().getSchemaRegistryService().getSchema(schemaId).handle((schema, error) -> {
-            handleGetSchemaResponse(response, schema, error);
-            return null;
-        });
-    }
-
     public CompletableFuture<SchemaAndMetadata> getSchemaAsync(boolean authoritative) {
         return validateOwnershipAndOperationAsync(authoritative, TopicOperation.GET_METADATA)
                 .thenApply(__ -> getSchemaId())
                 .thenCompose(schemaId -> pulsar().getSchemaRegistryService().getSchema(schemaId));
     }
 
-    public void getSchema(boolean authoritative, String version, AsyncResponse response) {
-        validateDestinationAndAdminOperation(authoritative);
-        String schemaId = getSchemaId();
-        ByteBuffer bbVersion = ByteBuffer.allocate(Long.BYTES);
-        bbVersion.putLong(Long.parseLong(version));
-        SchemaVersion v = pulsar().getSchemaRegistryService().versionFromBytes(bbVersion.array());
-        pulsar().getSchemaRegistryService().getSchema(schemaId, v).handle((schema, error) -> {
-            handleGetSchemaResponse(response, schema, error);
-            return null;
-        });
-    }
-
     public CompletableFuture<SchemaAndMetadata> getSchemaAsync(boolean authoritative, String version) {
         return validateOwnershipAndOperationAsync(authoritative, TopicOperation.GET_METADATA)
                 .thenApply(__ -> getSchemaId())
@@ -127,16 +97,6 @@ public class SchemasResourceBase extends AdminResource {
                 });
     }
 
-    public void getAllSchemas(boolean authoritative, AsyncResponse response) {
-        validateDestinationAndAdminOperation(authoritative);
-
-        String schemaId = getSchemaId();
-        pulsar().getSchemaRegistryService().trimDeletedSchemaAndGetList(schemaId).handle((schema, error) -> {
-            handleGetAllSchemasResponse(response, schema, error);
-            return null;
-        });
-    }
-
     public CompletableFuture<List<SchemaAndMetadata>> getAllSchemasAsync(boolean authoritative) {
         return validateOwnershipAndOperationAsync(authoritative, TopicOperation.GET_METADATA)
                 .thenCompose(__ -> {
@@ -145,24 +105,6 @@ public class SchemasResourceBase extends AdminResource {
                 });
     }
 
-    public void deleteSchema(boolean authoritative, AsyncResponse response, boolean force) {
-        validateDestinationAndAdminOperation(authoritative);
-
-        String schemaId = getSchemaId();
-        pulsar().getSchemaRegistryService().deleteSchema(schemaId, defaultIfEmpty(clientAppId(), ""), force)
-                .handle((version, error) -> {
-                    if (isNull(error)) {
-                        response.resume(Response.ok()
-                                .entity(DeleteSchemaResponse.builder().version(getLongSchemaVersion(version)).build())
-                                .build());
-                    } else {
-                        log.error("[{}] Failed to delete schema for topic {}", clientAppId(), topicName, error);
-                        response.resume(new RestException(error));
-                    }
-                    return null;
-                });
-    }
-
     public CompletableFuture<SchemaVersion> deleteSchemaAsync(boolean authoritative, boolean force) {
         return validateDestinationAndAdminOperationAsync(authoritative)
                 .thenCompose(__ -> {
@@ -172,61 +114,6 @@ public class SchemasResourceBase extends AdminResource {
                 });
     }
 
-    public void postSchema(PostSchemaPayload payload, boolean authoritative, AsyncResponse response) {
-        validateDestinationAndAdminOperation(authoritative);
-
-        getSchemaCompatibilityStrategyAsync().thenAccept(schemaCompatibilityStrategy -> {
-            byte[] data;
-            if (SchemaType.KEY_VALUE.name().equals(payload.getType())) {
-                try {
-                    data = DefaultImplementation.getDefaultImplementation()
-                            .convertKeyValueDataStringToSchemaInfoSchema(payload.getSchema().getBytes(Charsets.UTF_8));
-                } catch (IOException conversionError) {
-                    log.error("[{}] Failed to post schema for topic {}", clientAppId(), topicName, conversionError);
-                    response.resume(new RestException(conversionError));
-                    return;
-                }
-            } else {
-                data = payload.getSchema().getBytes(Charsets.UTF_8);
-            }
-            pulsar().getSchemaRegistryService()
-                    .putSchemaIfAbsent(getSchemaId(),
-                            SchemaData.builder().data(data).isDeleted(false).timestamp(clock.millis())
-                                    .type(SchemaType.valueOf(payload.getType())).user(defaultIfEmpty(clientAppId(), ""))
-                                    .props(payload.getProperties()).build(),
-                            schemaCompatibilityStrategy)
-                    .thenAccept(version -> response.resume(
-                            Response.accepted().entity(PostSchemaResponse.builder().version(version).build()).build()))
-                    .exceptionally(error -> {
-                        Throwable throwable = FutureUtil.unwrapCompletionException(error);
-                        if (throwable instanceof IncompatibleSchemaException) {
-                            response.resume(Response
-                                    .status(Response.Status.CONFLICT.getStatusCode(), throwable.getMessage())
-                                    .build());
-                        } else if (throwable instanceof InvalidSchemaDataException) {
-                            response.resume(Response.status(422, /* Unprocessable Entity */
-                                    throwable.getMessage()).build());
-                        } else {
-                            log.error("[{}] Failed to post schema for topic {}", clientAppId(), topicName, throwable);
-                            response.resume(new RestException(throwable));
-                        }
-                        return null;
-                    });
-        }).exceptionally(error -> {
-            Throwable throwable = FutureUtil.unwrapCompletionException(error);
-            if (throwable instanceof RestException) {
-                // Unprocessable Entity
-                response.resume(Response
-                        .status(((RestException) throwable).getResponse().getStatus(), throwable.getMessage())
-                        .build());
-            } else {
-                log.error("[{}] Failed to post schema for topic {}", clientAppId(), topicName, throwable);
-                response.resume(new RestException(throwable));
-            }
-            return null;
-        });
-    }
-
     public CompletableFuture<SchemaVersion> postSchemaAsync(PostSchemaPayload payload, boolean authoritative) {
         return validateDestinationAndAdminOperationAsync(authoritative)
                 .thenCompose(__ -> getSchemaCompatibilityStrategyAsync())
@@ -254,27 +141,6 @@ public class SchemasResourceBase extends AdminResource {
                 });
     }
 
-    public void testCompatibility(PostSchemaPayload payload, boolean authoritative, AsyncResponse response) {
-        validateDestinationAndAdminOperation(authoritative);
-
-        String schemaId = getSchemaId();
-
-        getSchemaCompatibilityStrategyAsync().thenCompose(schemaCompatibilityStrategy -> pulsar()
-                        .getSchemaRegistryService().isCompatible(schemaId,
-                                SchemaData.builder().data(payload.getSchema().getBytes(Charsets.UTF_8)).isDeleted(false)
-                                        .timestamp(clock.millis()).type(SchemaType.valueOf(payload.getType()))
-                                        .user(defaultIfEmpty(clientAppId(), "")).props(payload.getProperties()).build(),
-                                schemaCompatibilityStrategy)
-                        .thenAccept(isCompatible -> response.resume(Response.accepted()
-                                .entity(IsCompatibilityResponse.builder().isCompatibility(isCompatible)
-                                        .schemaCompatibilityStrategy(schemaCompatibilityStrategy.name()).build())
-                                .build())))
-                .exceptionally(error -> {
-                    response.resume(new RestException(FutureUtil.unwrapCompletionException(error)));
-                    return null;
-                });
-    }
-
     public CompletableFuture<Pair<Boolean, SchemaCompatibilityStrategy>> testCompatibilityAsync(
             PostSchemaPayload payload, boolean authoritative) {
         return validateDestinationAndAdminOperationAsync(authoritative)
@@ -292,26 +158,6 @@ public class SchemasResourceBase extends AdminResource {
                 });
     }
 
-    public void getVersionBySchema(PostSchemaPayload payload, boolean authoritative, AsyncResponse response) {
-        validateDestinationAndAdminOperation(authoritative);
-
-        String schemaId = getSchemaId();
-
-        pulsar().getSchemaRegistryService()
-                .findSchemaVersion(schemaId,
-                        SchemaData.builder().data(payload.getSchema().getBytes(Charsets.UTF_8)).isDeleted(false)
-                                .timestamp(clock.millis()).type(SchemaType.valueOf(payload.getType()))
-                                .user(defaultIfEmpty(clientAppId(), "")).props(payload.getProperties()).build())
-                .thenAccept(version -> response.resume(Response.accepted()
-                        .entity(LongSchemaVersionResponse.builder().version(version).build()).build()))
-                .exceptionally(error -> {
-                    Throwable throwable = FutureUtil.unwrapCompletionException(error);
-                    log.error("[{}] Failed to get version by schema for topic {}", clientAppId(), topicName, throwable);
-                    response.resume(new RestException(throwable));
-                    return null;
-                });
-    }
-
     public CompletableFuture<Long> getVersionBySchemaAsync(PostSchemaPayload payload, boolean authoritative) {
         return validateOwnershipAndOperationAsync(authoritative, TopicOperation.GET_METADATA)
                 .thenCompose(__ -> {
@@ -349,25 +195,6 @@ public class SchemasResourceBase extends AdminResource {
         }
     }
 
-    protected static void handleGetSchemaResponse(AsyncResponse response, SchemaAndMetadata schema, Throwable error) {
-        if (isNull(error)) {
-            if (isNull(schema)) {
-                response.resume(Response.status(
-                        Response.Status.NOT_FOUND.getStatusCode(), "Schema not found").build());
-            } else if (schema.schema.isDeleted()) {
-                response.resume(Response.status(
-                        Response.Status.NOT_FOUND.getStatusCode(), "Schema is deleted").build());
-            } else {
-                response.resume(Response.ok().encoding(MediaType.APPLICATION_JSON)
-                        .entity(convertSchemaAndMetadataToGetSchemaResponse(schema)).build());
-            }
-        } else {
-            log.error("Failed to get schema", error);
-            response.resume(new RestException(error));
-        }
-
-    }
-
     protected GetSchemaResponse convertToSchemaResponse(SchemaAndMetadata schema) {
         if (isNull(schema)) {
             throw new RestException(Response.Status.NOT_FOUND.getStatusCode(), "Schema not found");
@@ -389,40 +216,6 @@ public class SchemasResourceBase extends AdminResource {
         }
     }
 
-    private static void handleGetAllSchemasResponse(AsyncResponse response, List<SchemaAndMetadata> schemas,
-            Throwable error) {
-        if (isNull(error)) {
-            if (isNull(schemas)) {
-                response.resume(Response.status(
-                        Response.Status.NOT_FOUND.getStatusCode(), "Schemas not found").build());
-            } else {
-                response.resume(Response.ok().encoding(MediaType.APPLICATION_JSON)
-                        .entity(GetAllVersionsSchemaResponse.builder()
-                                .getSchemaResponses(schemas.stream()
-                                        .map(SchemasResourceBase::convertSchemaAndMetadataToGetSchemaResponse)
-                                        .collect(Collectors.toList()))
-                                .build())
-                        .build());
-            }
-        } else {
-            log.error("Failed to get all schemas", error);
-            response.resume(new RestException(error));
-        }
-    }
-
-    private void validateDestinationAndAdminOperation(boolean authoritative) {
-        try {
-            validateAdminAccessForTenant(topicName.getTenant());
-            validateTopicOwnership(topicName, authoritative);
-        } catch (RestException e) {
-            if (e.getResponse().getStatus() == Response.Status.UNAUTHORIZED.getStatusCode()) {
-                throw new RestException(Response.Status.UNAUTHORIZED, e.getMessage());
-            } else {
-                throw e;
-            }
-        }
-    }
-
     private CompletableFuture<Void> validateDestinationAndAdminOperationAsync(boolean authoritative) {
         return validateTopicOwnershipAsync(topicName, authoritative)
                 .thenCompose(__ -> validateAdminAccessForTenantAsync(topicName.getTenant()));
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/SchemasResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/SchemasResource.java
index c6e4239a3aa..13bfba35121 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/SchemasResource.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/SchemasResource.java
@@ -38,14 +38,20 @@ import javax.ws.rs.QueryParam;
 import javax.ws.rs.container.AsyncResponse;
 import javax.ws.rs.container.Suspended;
 import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.broker.admin.impl.SchemasResourceBase;
+import org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException;
+import org.apache.pulsar.broker.service.schema.exceptions.InvalidSchemaDataException;
 import org.apache.pulsar.common.protocol.schema.DeleteSchemaResponse;
 import org.apache.pulsar.common.protocol.schema.GetAllVersionsSchemaResponse;
 import org.apache.pulsar.common.protocol.schema.GetSchemaResponse;
 import org.apache.pulsar.common.protocol.schema.IsCompatibilityResponse;
+import org.apache.pulsar.common.protocol.schema.LongSchemaVersionResponse;
 import org.apache.pulsar.common.protocol.schema.PostSchemaPayload;
 import org.apache.pulsar.common.protocol.schema.PostSchemaResponse;
 import org.apache.pulsar.common.schema.LongSchemaVersion;
+import org.apache.pulsar.common.util.FutureUtil;
 
 @Path("/schemas")
 @Api(
@@ -53,6 +59,7 @@ import org.apache.pulsar.common.schema.LongSchemaVersion;
     description = "Schemas related admin APIs",
     tags = "schemas"
 )
+@Slf4j
 public class SchemasResource extends SchemasResourceBase {
 
     public SchemasResource() {
@@ -81,7 +88,16 @@ public class SchemasResource extends SchemasResourceBase {
         @Suspended final AsyncResponse response
     ) {
         validateTopicName(tenant, cluster, namespace, topic);
-        getSchema(authoritative, response);
+        getSchemaAsync(authoritative)
+                .thenApply(schemaAndMetadata -> convertToSchemaResponse(schemaAndMetadata))
+                .thenApply(response::resume)
+                .exceptionally(ex -> {
+                    if (!isRedirectException(ex)) {
+                        log.error("[{}] Failed to get schema for topic {}", clientAppId(), topicName, ex);
+                    }
+                    resumeAsyncResponseExceptionally(response, ex);
+                    return null;
+                });
     }
 
     @GET
@@ -107,7 +123,17 @@ public class SchemasResource extends SchemasResourceBase {
         @Suspended final AsyncResponse response
     ) {
         validateTopicName(tenant, cluster, namespace, topic);
-        getSchema(authoritative, version, response);
+        getSchemaAsync(authoritative, version)
+                .thenApply(schemaAndMetadata -> convertToSchemaResponse(schemaAndMetadata))
+                .thenAccept(response::resume)
+                .exceptionally(ex -> {
+                    if (!isRedirectException(ex)) {
+                        log.error("[{}] Failed to get schema for topic {} with version {}",
+                                clientAppId(), topicName, version, ex);
+                    }
+                    resumeAsyncResponseExceptionally(response, ex);
+                    return null;
+                });
     }
 
     @GET
@@ -132,7 +158,16 @@ public class SchemasResource extends SchemasResourceBase {
             @Suspended final AsyncResponse response
     ) {
         validateTopicName(tenant, cluster, namespace, topic);
-        getAllSchemas(authoritative, response);
+        getAllSchemasAsync(authoritative)
+                .thenApply(schemaAndMetadata -> convertToAllVersionsSchemaResponse(schemaAndMetadata))
+                .thenAccept(response::resume)
+                .exceptionally(ex -> {
+                    if (!isRedirectException(ex)) {
+                        log.error("[{}] Failed to get all schemas for topic {}", clientAppId(), topicName, ex);
+                    }
+                    resumeAsyncResponseExceptionally(response, ex);
+                    return null;
+                });
     }
 
     @DELETE
@@ -157,7 +192,17 @@ public class SchemasResource extends SchemasResourceBase {
         @Suspended final AsyncResponse response
     ) {
         validateTopicName(tenant, cluster, namespace, topic);
-        deleteSchema(authoritative, response, force);
+        deleteSchemaAsync(authoritative, force)
+                .thenAccept(version -> {
+                    response.resume(DeleteSchemaResponse.builder().version(getLongSchemaVersion(version)).build());
+                })
+                .exceptionally(ex -> {
+                    if (!isRedirectException(ex)) {
+                        log.error("[{}] Failed to delete schemas for topic {}", clientAppId(), topicName, ex);
+                    }
+                    resumeAsyncResponseExceptionally(response, ex);
+                    return null;
+                });
     }
 
     @POST
@@ -195,7 +240,25 @@ public class SchemasResource extends SchemasResourceBase {
         @Suspended final AsyncResponse response
     ) {
         validateTopicName(tenant, cluster, namespace, topic);
-        postSchema(payload, authoritative, response);
+        postSchemaAsync(payload, authoritative)
+                .thenAccept(version -> response.resume(PostSchemaResponse.builder().version(version).build()))
+                .exceptionally(ex -> {
+                    Throwable root = FutureUtil.unwrapCompletionException(ex);
+                    if (root instanceof IncompatibleSchemaException) {
+                        response.resume(Response
+                                .status(Response.Status.CONFLICT.getStatusCode(), root.getMessage())
+                                .build());
+                    } else if (root instanceof InvalidSchemaDataException) {
+                        response.resume(Response.status(422, /* Unprocessable Entity */
+                                root.getMessage()).build());
+                    } else {
+                        if (!isRedirectException(ex)) {
+                            log.error("[{}] Failed to post schemas for topic {}", clientAppId(), topicName, root);
+                        }
+                        resumeAsyncResponseExceptionally(response, ex);
+                    }
+                    return null;
+                });
     }
 
     @POST
@@ -232,7 +295,18 @@ public class SchemasResource extends SchemasResourceBase {
             @Suspended final AsyncResponse response
     ) {
         validateTopicName(tenant, cluster, namespace, topic);
-        testCompatibility(payload, authoritative, response);
+        testCompatibilityAsync(payload, authoritative)
+                .thenAccept(pair -> response.resume(Response.accepted()
+                        .entity(IsCompatibilityResponse.builder().isCompatibility(pair.getLeft())
+                                .schemaCompatibilityStrategy(pair.getRight().name()).build())
+                        .build()))
+                .exceptionally(ex -> {
+                    if (!isRedirectException(ex)) {
+                        log.error("[{}] Failed to test compatibility for topic {}", clientAppId(), topicName, ex);
+                    }
+                    resumeAsyncResponseExceptionally(response, ex);
+                    return null;
+                });
     }
 
     @POST
@@ -270,6 +344,14 @@ public class SchemasResource extends SchemasResourceBase {
             @Suspended final AsyncResponse response
     ) {
         validateTopicName(tenant, cluster, namespace, topic);
-        getVersionBySchema(payload, authoritative, response);
+        getVersionBySchemaAsync(payload, authoritative)
+                .thenAccept(version -> response.resume(LongSchemaVersionResponse.builder().version(version).build()))
+                .exceptionally(ex -> {
+                    if (!isRedirectException(ex)) {
+                        log.error("[{}] Failed to get version by schema for topic {}", clientAppId(), topicName, ex);
+                    }
+                    resumeAsyncResponseExceptionally(response, ex);
+                    return null;
+                });
     }
 }