You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by bo...@apache.org on 2022/06/02 01:56:08 UTC

[pulsar] branch master updated: [improve][broker] Make some methods in SchemasResourceBase async. (#15821)

This is an automated email from the ASF dual-hosted git repository.

bogong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 8bcb921d973 [improve][broker] Make some methods in SchemasResourceBase async. (#15821)
8bcb921d973 is described below

commit 8bcb921d973c96ea09d885921186e4040304ebb1
Author: Jiwei Guo <te...@apache.org>
AuthorDate: Thu Jun 2 09:56:00 2022 +0800

    [improve][broker] Make some methods in SchemasResourceBase async. (#15821)
    
    ### Motivation
    See PIP #14365  and change tracker #15043.
    
     Make `getSchema` /  `deleteSchema` / `postSchema` / `getVersionBySchema` / `testCompatibility` methods in SchemasResourceBase to pure async.
---
 .../broker/admin/impl/SchemasResourceBase.java     | 129 ++++++++++++-
 .../pulsar/broker/admin/v2/SchemasResource.java    | 199 +++++++++++++--------
 .../pulsar/broker/admin/AdminApiSchemaTest.java    |   2 +-
 3 files changed, 253 insertions(+), 77 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SchemasResourceBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SchemasResourceBase.java
index 761b7727532..88487c11296 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SchemasResourceBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SchemasResourceBase.java
@@ -27,17 +27,21 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.time.Clock;
 import java.util.List;
+import java.util.concurrent.CompletableFuture;
 import java.util.stream.Collectors;
 import javax.ws.rs.container.AsyncResponse;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.broker.admin.AdminResource;
 import org.apache.pulsar.broker.service.schema.SchemaRegistry.SchemaAndMetadata;
+import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
 import org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException;
 import org.apache.pulsar.broker.service.schema.exceptions.InvalidSchemaDataException;
 import org.apache.pulsar.broker.web.RestException;
 import org.apache.pulsar.client.internal.DefaultImplementation;
 import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
 import org.apache.pulsar.common.protocol.schema.DeleteSchemaResponse;
 import org.apache.pulsar.common.protocol.schema.GetAllVersionsSchemaResponse;
 import org.apache.pulsar.common.protocol.schema.GetSchemaResponse;
@@ -67,7 +71,7 @@ public class SchemasResourceBase extends AdminResource {
         this.clock = clock;
     }
 
-    private static long getLongSchemaVersion(SchemaVersion schemaVersion) {
+    protected static long getLongSchemaVersion(SchemaVersion schemaVersion) {
         if (schemaVersion instanceof LongSchemaVersion) {
             return ((LongSchemaVersion) schemaVersion).getVersion();
         } else {
@@ -92,6 +96,12 @@ public class SchemasResourceBase extends AdminResource {
         });
     }
 
+    public CompletableFuture<SchemaAndMetadata> getSchemaAsync(boolean authoritative) {
+        return validateDestinationAndAdminOperationAsync(authoritative)
+                .thenApply(__ -> getSchemaId())
+                .thenCompose(schemaId -> pulsar().getSchemaRegistryService().getSchema(schemaId));
+    }
+
     public void getSchema(boolean authoritative, String version, AsyncResponse response) {
         validateDestinationAndAdminOperation(authoritative);
         String schemaId = getSchemaId();
@@ -104,6 +114,18 @@ public class SchemasResourceBase extends AdminResource {
         });
     }
 
+    public CompletableFuture<SchemaAndMetadata> getSchemaAsync(boolean authoritative, String version) {
+        return validateDestinationAndAdminOperationAsync(authoritative)
+                .thenApply(__ -> getSchemaId())
+                .thenCompose(schemaId -> {
+                    ByteBuffer bbVersion = ByteBuffer.allocate(Long.BYTES);
+                    bbVersion.putLong(Long.parseLong(version));
+                    SchemaRegistryService schemaRegistryService = pulsar().getSchemaRegistryService();
+                    SchemaVersion schemaVersion = schemaRegistryService.versionFromBytes(bbVersion.array());
+                    return schemaRegistryService.getSchema(schemaId, schemaVersion);
+                });
+    }
+
     public void getAllSchemas(boolean authoritative, AsyncResponse response) {
         validateDestinationAndAdminOperation(authoritative);
 
@@ -114,6 +136,14 @@ public class SchemasResourceBase extends AdminResource {
         });
     }
 
+    public CompletableFuture<List<SchemaAndMetadata>> getAllSchemasAsync(boolean authoritative) {
+        return validateDestinationAndAdminOperationAsync(authoritative)
+                .thenCompose(__ -> {
+                    String schemaId = getSchemaId();
+                    return pulsar().getSchemaRegistryService().trimDeletedSchemaAndGetList(schemaId);
+                });
+    }
+
     public void deleteSchema(boolean authoritative, AsyncResponse response, boolean force) {
         validateDestinationAndAdminOperation(authoritative);
 
@@ -132,6 +162,15 @@ public class SchemasResourceBase extends AdminResource {
                 });
     }
 
+    public CompletableFuture<SchemaVersion> deleteSchemaAsync(boolean authoritative, boolean force) {
+        return validateDestinationAndAdminOperationAsync(authoritative)
+                .thenCompose(__ -> {
+                    String schemaId = getSchemaId();
+                    return pulsar().getSchemaRegistryService()
+                            .deleteSchema(schemaId, defaultIfEmpty(clientAppId(), ""), force);
+                });
+    }
+
     public void postSchema(PostSchemaPayload payload, boolean authoritative, AsyncResponse response) {
         validateDestinationAndAdminOperation(authoritative);
 
@@ -187,6 +226,33 @@ public class SchemasResourceBase extends AdminResource {
         });
     }
 
+    public CompletableFuture<SchemaVersion> postSchemaAsync(PostSchemaPayload payload, boolean authoritative) {
+        return validateDestinationAndAdminOperationAsync(authoritative)
+                .thenCompose(__ -> getSchemaCompatibilityStrategyAsync())
+                .thenCompose(schemaCompatibilityStrategy -> {
+                    byte[] data;
+                    if (SchemaType.KEY_VALUE.name().equals(payload.getType())) {
+                        try {
+                            data = DefaultImplementation.getDefaultImplementation()
+                                    .convertKeyValueDataStringToSchemaInfoSchema(payload.getSchema()
+                                            .getBytes(Charsets.UTF_8));
+                        } catch (IOException conversionError) {
+                            throw new RestException(conversionError);
+                        }
+                    } else {
+                        data = payload.getSchema().getBytes(Charsets.UTF_8);
+                    }
+                    return pulsar().getSchemaRegistryService()
+                            .putSchemaIfAbsent(getSchemaId(),
+                                    SchemaData.builder().data(data).isDeleted(false).timestamp(clock.millis())
+                                            .type(SchemaType.valueOf(payload.getType()))
+                                            .user(defaultIfEmpty(clientAppId(), ""))
+                                            .props(payload.getProperties())
+                                            .build(),
+                                    schemaCompatibilityStrategy);
+                });
+    }
+
     public void testCompatibility(PostSchemaPayload payload, boolean authoritative, AsyncResponse response) {
         validateDestinationAndAdminOperation(authoritative);
 
@@ -208,9 +274,24 @@ public class SchemasResourceBase extends AdminResource {
                 });
     }
 
-    public void getVersionBySchema(
+    public CompletableFuture<Pair<Boolean, SchemaCompatibilityStrategy>> testCompatibilityAsync(
+            PostSchemaPayload payload, boolean authoritative) {
+        return validateDestinationAndAdminOperationAsync(authoritative)
+                .thenCompose(__ -> getSchemaCompatibilityStrategyAsync())
+                .thenCompose(strategy -> {
+                    String schemaId = getSchemaId();
+                    return pulsar().getSchemaRegistryService().isCompatible(schemaId,
+                            SchemaData.builder().data(payload.getSchema().getBytes(Charsets.UTF_8))
+                                    .isDeleted(false)
+                                    .timestamp(clock.millis()).type(SchemaType.valueOf(payload.getType()))
+                                    .user(defaultIfEmpty(clientAppId(), ""))
+                                    .props(payload.getProperties())
+                                    .build(), strategy)
+                            .thenApply(v -> Pair.of(v, strategy));
+                });
+    }
 
-            PostSchemaPayload payload, boolean authoritative, AsyncResponse response) {
+    public void getVersionBySchema(PostSchemaPayload payload, boolean authoritative, AsyncResponse response) {
         validateDestinationAndAdminOperation(authoritative);
 
         String schemaId = getSchemaId();
@@ -230,6 +311,20 @@ public class SchemasResourceBase extends AdminResource {
                 });
     }
 
+    public CompletableFuture<Long> getVersionBySchemaAsync(PostSchemaPayload payload, boolean authoritative) {
+        return validateDestinationAndAdminOperationAsync(authoritative)
+                .thenCompose(__ -> {
+                    String schemaId = getSchemaId();
+                    return pulsar().getSchemaRegistryService()
+                            .findSchemaVersion(schemaId,
+                                    SchemaData.builder().data(payload.getSchema().getBytes(Charsets.UTF_8))
+                                            .isDeleted(false).timestamp(clock.millis())
+                                            .type(SchemaType.valueOf(payload.getType()))
+                                            .user(defaultIfEmpty(clientAppId(), ""))
+                                            .props(payload.getProperties()).build());
+                });
+    }
+
     @Override
     protected String domain() {
         return "persistent";
@@ -253,7 +348,7 @@ public class SchemasResourceBase extends AdminResource {
         }
     }
 
-    private static void handleGetSchemaResponse(AsyncResponse response, SchemaAndMetadata schema, Throwable error) {
+    protected static void handleGetSchemaResponse(AsyncResponse response, SchemaAndMetadata schema, Throwable error) {
         if (isNull(error)) {
             if (isNull(schema)) {
                 response.resume(Response.status(
@@ -272,6 +367,27 @@ public class SchemasResourceBase extends AdminResource {
 
     }
 
+    protected GetSchemaResponse convertToSchemaResponse(SchemaAndMetadata schema) {
+        if (isNull(schema)) {
+            throw new RestException(Response.Status.NOT_FOUND.getStatusCode(), "Schema not found");
+        } else if (schema.schema.isDeleted()) {
+            throw new RestException(Response.Status.NOT_FOUND.getStatusCode(), "Schema is deleted");
+        }
+        return convertSchemaAndMetadataToGetSchemaResponse(schema);
+    }
+
+    protected GetAllVersionsSchemaResponse convertToAllVersionsSchemaResponse(List<SchemaAndMetadata> schemas) {
+        if (isNull(schemas)) {
+            throw new RestException(Response.Status.NOT_FOUND.getStatusCode(), "Schemas not found");
+        } else {
+            return GetAllVersionsSchemaResponse.builder()
+                    .getSchemaResponses(schemas.stream()
+                            .map(SchemasResourceBase::convertSchemaAndMetadataToGetSchemaResponse)
+                            .collect(Collectors.toList()))
+                    .build();
+        }
+    }
+
     private static void handleGetAllSchemasResponse(AsyncResponse response, List<SchemaAndMetadata> schemas,
             Throwable error) {
         if (isNull(error)) {
@@ -306,5 +422,10 @@ public class SchemasResourceBase extends AdminResource {
         }
     }
 
+    private CompletableFuture<Void> validateDestinationAndAdminOperationAsync(boolean authoritative) {
+        return validateTopicOwnershipAsync(topicName, authoritative)
+                .thenCompose(__ -> validateAdminAccessForTenantAsync(topicName.getTenant()));
+    }
+
     private static final Logger log = LoggerFactory.getLogger(SchemasResourceBase.class);
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/SchemasResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/SchemasResource.java
index 1ac06057429..e75eed8cd7c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/SchemasResource.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/SchemasResource.java
@@ -39,14 +39,20 @@ import javax.ws.rs.QueryParam;
 import javax.ws.rs.container.AsyncResponse;
 import javax.ws.rs.container.Suspended;
 import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.broker.admin.impl.SchemasResourceBase;
+import org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException;
+import org.apache.pulsar.broker.service.schema.exceptions.InvalidSchemaDataException;
 import org.apache.pulsar.common.protocol.schema.DeleteSchemaResponse;
 import org.apache.pulsar.common.protocol.schema.GetAllVersionsSchemaResponse;
 import org.apache.pulsar.common.protocol.schema.GetSchemaResponse;
 import org.apache.pulsar.common.protocol.schema.IsCompatibilityResponse;
+import org.apache.pulsar.common.protocol.schema.LongSchemaVersionResponse;
 import org.apache.pulsar.common.protocol.schema.PostSchemaPayload;
 import org.apache.pulsar.common.protocol.schema.PostSchemaResponse;
 import org.apache.pulsar.common.schema.LongSchemaVersion;
+import org.apache.pulsar.common.util.FutureUtil;
 
 @Path("/schemas")
 @Api(
@@ -54,6 +60,7 @@ import org.apache.pulsar.common.schema.LongSchemaVersion;
     description = "Schemas related admin APIs",
     tags = "schemas"
 )
+@Slf4j
 public class SchemasResource extends SchemasResourceBase {
 
     @VisibleForTesting
@@ -75,14 +82,22 @@ public class SchemasResource extends SchemasResourceBase {
             @ApiResponse(code = 500, message = "Internal Server Error"),
     })
     public void getSchema(
-        @PathParam("tenant") String tenant,
-        @PathParam("namespace") String namespace,
-        @PathParam("topic") String topic,
-        @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
-        @Suspended final AsyncResponse response
-    ) {
+            @PathParam("tenant") String tenant,
+            @PathParam("namespace") String namespace,
+            @PathParam("topic") String topic,
+            @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
+            @Suspended final AsyncResponse response) {
         validateTopicName(tenant, namespace, topic);
-        getSchema(authoritative, response);
+        getSchemaAsync(authoritative)
+                .thenApply(schemaAndMetadata -> convertToSchemaResponse(schemaAndMetadata))
+                .thenApply(response::resume)
+                .exceptionally(ex -> {
+                    if (!isRedirectException(ex)) {
+                        log.error("[{}] Failed to get schema for topic {}", clientAppId(), topicName, ex);
+                    }
+                    resumeAsyncResponseExceptionally(response, ex);
+                    return null;
+                });
     }
 
     @GET
@@ -99,15 +114,24 @@ public class SchemasResource extends SchemasResourceBase {
             @ApiResponse(code = 500, message = "Internal Server Error"),
     })
     public void getSchema(
-        @PathParam("tenant") String tenant,
-        @PathParam("namespace") String namespace,
-        @PathParam("topic") String topic,
-        @PathParam("version") @Encoded String version,
-        @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
-        @Suspended final AsyncResponse response
-    ) {
+            @PathParam("tenant") String tenant,
+            @PathParam("namespace") String namespace,
+            @PathParam("topic") String topic,
+            @PathParam("version") @Encoded String version,
+            @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
+            @Suspended final AsyncResponse response) {
         validateTopicName(tenant, namespace, topic);
-        getSchema(authoritative, version, response);
+        getSchemaAsync(authoritative, version)
+                .thenApply(schemaAndMetadata -> convertToSchemaResponse(schemaAndMetadata))
+                .thenAccept(response::resume)
+                .exceptionally(ex -> {
+                    if (!isRedirectException(ex)) {
+                        log.error("[{}] Failed to get schema for topic {} with version {}",
+                                clientAppId(), topicName, version, ex);
+                    }
+                    resumeAsyncResponseExceptionally(response, ex);
+                    return null;
+                });
     }
 
     @GET
@@ -128,10 +152,18 @@ public class SchemasResource extends SchemasResourceBase {
             @PathParam("namespace") String namespace,
             @PathParam("topic") String topic,
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
-            @Suspended final AsyncResponse response
-    ) {
+            @Suspended final AsyncResponse response) {
         validateTopicName(tenant, namespace, topic);
-        getAllSchemas(authoritative, response);
+        getAllSchemasAsync(authoritative)
+                .thenApply(schemaAndMetadata -> convertToAllVersionsSchemaResponse(schemaAndMetadata))
+                .thenAccept(response::resume)
+                .exceptionally(ex -> {
+                    if (!isRedirectException(ex)) {
+                        log.error("[{}] Failed to get all schemas for topic {}", clientAppId(), topicName, ex);
+                    }
+                    resumeAsyncResponseExceptionally(response, ex);
+                    return null;
+                });
     }
 
     @DELETE
@@ -147,15 +179,24 @@ public class SchemasResource extends SchemasResourceBase {
         @ApiResponse(code = 500, message = "Internal Server Error"),
     })
     public void deleteSchema(
-        @PathParam("tenant") String tenant,
-        @PathParam("namespace") String namespace,
-        @PathParam("topic") String topic,
-        @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
-        @QueryParam("force") @DefaultValue("false") boolean force,
-        @Suspended final AsyncResponse response
-    ) {
+            @PathParam("tenant") String tenant,
+            @PathParam("namespace") String namespace,
+            @PathParam("topic") String topic,
+            @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
+            @QueryParam("force") @DefaultValue("false") boolean force,
+            @Suspended final AsyncResponse response) {
         validateTopicName(tenant, namespace, topic);
-        deleteSchema(authoritative, response, force);
+        deleteSchemaAsync(authoritative, force)
+                .thenAccept(version -> {
+                    response.resume(DeleteSchemaResponse.builder().version(getLongSchemaVersion(version)).build());
+                })
+                .exceptionally(ex -> {
+                    if (!isRedirectException(ex)) {
+                        log.error("[{}] Failed to delete schemas for topic {}", clientAppId(), topicName, ex);
+                    }
+                    resumeAsyncResponseExceptionally(response, ex);
+                    return null;
+                });
     }
 
     @POST
@@ -174,25 +215,36 @@ public class SchemasResource extends SchemasResourceBase {
         @ApiResponse(code = 500, message = "Internal Server Error"),
     })
     public void postSchema(
-        @PathParam("tenant") String tenant,
-        @PathParam("namespace") String namespace,
-        @PathParam("topic") String topic,
-        @ApiParam(
-            value = "A JSON value presenting a schema playload. An example of the expected schema can be found down"
-                + " here.",
-            examples = @Example(
-                value = @ExampleProperty(
-                    mediaType = MediaType.APPLICATION_JSON,
-                    value = "{\"type\": \"STRING\", \"schema\": \"\", \"properties\": { \"key1\" : \"value1\" + } }"
-                )
-            )
-        )
-        PostSchemaPayload payload,
-        @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
-        @Suspended final AsyncResponse response
-    ) {
+            @PathParam("tenant") String tenant,
+            @PathParam("namespace") String namespace,
+            @PathParam("topic") String topic,
+            @ApiParam(value = "A JSON value presenting a schema playload."
+                    + " An example of the expected schema can be found down here.",
+               examples = @Example(value = @ExampleProperty(mediaType = MediaType.APPLICATION_JSON,
+               value = "{\"type\": \"STRING\", \"schema\": \"\", \"properties\": { \"key1\" : \"value1\" + } }")))
+            PostSchemaPayload payload,
+            @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
+            @Suspended final AsyncResponse response) {
         validateTopicName(tenant, namespace, topic);
-        postSchema(payload, authoritative, response);
+        postSchemaAsync(payload, authoritative)
+                .thenAccept(version -> response.resume(PostSchemaResponse.builder().version(version).build()))
+                .exceptionally(ex -> {
+                    Throwable root = FutureUtil.unwrapCompletionException(ex);
+                    if (root instanceof IncompatibleSchemaException) {
+                        response.resume(Response
+                                .status(Response.Status.CONFLICT.getStatusCode(), root.getMessage())
+                                .build());
+                    } else if (root instanceof InvalidSchemaDataException) {
+                        response.resume(Response.status(422, /* Unprocessable Entity */
+                                root.getMessage()).build());
+                    } else {
+                        if (!isRedirectException(ex)) {
+                            log.error("[{}] Failed to post schemas for topic {}", clientAppId(), topicName, root);
+                        }
+                        resumeAsyncResponseExceptionally(response, ex);
+                    }
+                    return null;
+                });
     }
 
     @POST
@@ -212,23 +264,26 @@ public class SchemasResource extends SchemasResourceBase {
             @PathParam("tenant") String tenant,
             @PathParam("namespace") String namespace,
             @PathParam("topic") String topic,
-            @ApiParam(
-                    value = "A JSON value presenting a schema playload."
+            @ApiParam(value = "A JSON value presenting a schema playload."
                             + " An example of the expected schema can be found down here.",
-                    examples = @Example(
-                            value = @ExampleProperty(
-                                    mediaType = MediaType.APPLICATION_JSON,
-                                    value = "{\"type\": \"STRING\", \"schema\": \"\","
-                                            + " \"properties\": { \"key1\" : \"value1\" + } }"
-                            )
-                    )
-            )
-                    PostSchemaPayload payload,
+             examples = @Example(value = @ExampleProperty(mediaType = MediaType.APPLICATION_JSON,
+             value = "{\"type\": \"STRING\", \"schema\": \"\"," + " \"properties\": { \"key1\" : \"value1\" + } }")))
+            PostSchemaPayload payload,
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
-            @Suspended final AsyncResponse response
-    ) {
+            @Suspended final AsyncResponse response) {
         validateTopicName(tenant, namespace, topic);
-        testCompatibility(payload, authoritative, response);
+        testCompatibilityAsync(payload, authoritative)
+                .thenAccept(pair -> response.resume(Response.accepted()
+                        .entity(IsCompatibilityResponse.builder().isCompatibility(pair.getLeft())
+                                .schemaCompatibilityStrategy(pair.getRight().name()).build())
+                        .build()))
+                .exceptionally(ex -> {
+                    if (!isRedirectException(ex)) {
+                        log.error("[{}] Failed to test compatibility for topic {}", clientAppId(), topicName, ex);
+                    }
+                    resumeAsyncResponseExceptionally(response, ex);
+                    return null;
+                });
     }
 
     @POST
@@ -249,22 +304,22 @@ public class SchemasResource extends SchemasResourceBase {
             @PathParam("tenant") String tenant,
             @PathParam("namespace") String namespace,
             @PathParam("topic") String topic,
-            @ApiParam(
-                    value = "A JSON value presenting a schema playload."
+            @ApiParam(value = "A JSON value presenting a schema playload."
                             + " An example of the expected schema can be found down here.",
-                    examples = @Example(
-                            value = @ExampleProperty(
-                                    mediaType = MediaType.APPLICATION_JSON,
-                                    value = "{\"type\": \"STRING\", \"schema\": \"\","
-                                            + " \"properties\": { \"key1\" : \"value1\" + } }"
-                            )
-                    )
-            )
-                    PostSchemaPayload payload,
+            examples = @Example(value = @ExampleProperty(mediaType = MediaType.APPLICATION_JSON,
+            value = "{\"type\": \"STRING\", \"schema\": \"\"," + " \"properties\": { \"key1\" : \"value1\" + } }")))
+            PostSchemaPayload payload,
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
-            @Suspended final AsyncResponse response
-    ) {
+            @Suspended final AsyncResponse response) {
         validateTopicName(tenant, namespace, topic);
-        getVersionBySchema(payload, authoritative, response);
+        getVersionBySchemaAsync(payload, authoritative)
+                .thenAccept(version -> response.resume(LongSchemaVersionResponse.builder().version(version).build()))
+                .exceptionally(ex -> {
+                    if (!isRedirectException(ex)) {
+                        log.error("[{}] Failed to get version by schema for topic {}", clientAppId(), topicName, ex);
+                    }
+                    resumeAsyncResponseExceptionally(response, ex);
+                    return null;
+                });
     }
 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaTest.java
index cd1a3b781ac..acbe1f7c246 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaTest.java
@@ -212,7 +212,7 @@ public class AdminApiSchemaTest extends MockedPulsarServiceBaseTest {
             admin.schemas().createSchema(topicName, fooSchemaInfo);
             fail("Should have failed");
         } catch (PulsarAdminException.NotFoundException e) {
-            assertTrue(e.getMessage().contains("HTTP 404"));
+            assertTrue(e.getMessage().contains("Namespace does not exist"));
         }
     }