You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2019/07/30 16:47:25 UTC

[pulsar] branch master updated: Add the schema admin api (#4800)

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 8d19575  Add the schema admin api (#4800)
8d19575 is described below

commit 8d19575358c5fbf6d777ff922e8d205bd3254a49
Author: congbo <39...@users.noreply.github.com>
AuthorDate: Wed Jul 31 00:47:17 2019 +0800

    Add the schema admin api (#4800)
    
    ### Motivation
    Continue the issues of #4782
    
    ### Verifying this change
    Add the tests for it
---
 .../pulsar/broker/admin/v2/SchemasResource.java    | 193 ++++++++++++++++++++-
 .../schema/DefaultSchemaRegistryService.java       |  10 ++
 .../broker/service/schema/SchemaRegistry.java      |   4 +
 .../service/schema/SchemaRegistryService.java      |   1 +
 .../service/schema/SchemaRegistryServiceImpl.java  |  16 +-
 ...hemaRegistryServiceWithSchemaDataValidator.java |  10 ++
 .../broker/service/schema/SchemaServiceTest.java   |  35 ++++
 .../org/apache/pulsar/client/admin/Schemas.java    |  50 ++++++
 .../pulsar/client/admin/internal/SchemasImpl.java  | 102 ++++++++++-
 .../schema/GetAllVersionsSchemaResponse.java       |  34 ++++
 .../protocol/schema/IsCompatibilityResponse.java   |  34 ++++
 .../protocol/schema/LongSchemaVersionResponse.java |  32 ++++
 12 files changed, 506 insertions(+), 15 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/SchemasResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/SchemasResource.java
index 1f61d4a..0e68972 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/SchemasResource.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/SchemasResource.java
@@ -34,6 +34,8 @@ import io.swagger.annotations.Example;
 import io.swagger.annotations.ExampleProperty;
 import java.nio.ByteBuffer;
 import java.time.Clock;
+import java.util.List;
+import java.util.stream.Collectors;
 import javax.ws.rs.Consumes;
 import javax.ws.rs.DELETE;
 import javax.ws.rs.DefaultValue;
@@ -48,6 +50,7 @@ import javax.ws.rs.container.AsyncResponse;
 import javax.ws.rs.container.Suspended;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
+
 import org.apache.pulsar.broker.admin.AdminResource;
 import org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException;
 import org.apache.pulsar.broker.service.schema.LongSchemaVersion;
@@ -55,9 +58,13 @@ import org.apache.pulsar.broker.service.schema.SchemaCompatibilityStrategy;
 import org.apache.pulsar.broker.service.schema.SchemaRegistry.SchemaAndMetadata;
 import org.apache.pulsar.broker.service.schema.exceptions.InvalidSchemaDataException;
 import org.apache.pulsar.broker.web.RestException;
+import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.protocol.schema.DeleteSchemaResponse;
+import org.apache.pulsar.common.protocol.schema.GetAllVersionsSchemaResponse;
 import org.apache.pulsar.common.protocol.schema.GetSchemaResponse;
+import org.apache.pulsar.common.protocol.schema.IsCompatibilityResponse;
+import org.apache.pulsar.common.protocol.schema.LongSchemaVersionResponse;
 import org.apache.pulsar.common.protocol.schema.PostSchemaPayload;
 import org.apache.pulsar.common.protocol.schema.PostSchemaResponse;
 import org.apache.pulsar.common.protocol.schema.SchemaData;
@@ -106,6 +113,7 @@ public class SchemasResource extends AdminResource {
         @ApiResponse(code = 403, message = "Client is not authenticated"),
         @ApiResponse(code = 404, message = "Tenant or Namespace or Topic doesn't exist; or Schema is not found for this topic"),
         @ApiResponse(code = 412, message = "Failed to find the ownership for the topic"),
+        @ApiResponse(code = 500, message = "Internal Server Error"),
     })
     public void getSchema(
         @PathParam("tenant") String tenant,
@@ -134,6 +142,7 @@ public class SchemasResource extends AdminResource {
         @ApiResponse(code = 403, message = "Client is not authenticated"),
         @ApiResponse(code = 404, message = "Tenant or Namespace or Topic doesn't exist; or Schema is not found for this topic"),
         @ApiResponse(code = 412, message = "Failed to find the ownership for the topic"),
+        @ApiResponse(code = 500, message = "Internal Server Error"),
     })
     public void getSchema(
         @PathParam("tenant") String tenant,
@@ -156,6 +165,35 @@ public class SchemasResource extends AdminResource {
             });
     }
 
+    @GET
+    @Path("/{tenant}/{namespace}/{topic}/schemas")
+    @Produces(MediaType.APPLICATION_JSON)
+    @ApiOperation(value = "Get the all schemas of a topic", response = GetAllVersionsSchemaResponse.class)
+    @ApiResponses(value = {
+            @ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"),
+            @ApiResponse(code = 401, message = "Client is not authorized or Don't have admin permission"),
+            @ApiResponse(code = 403, message = "Client is not authenticated"),
+            @ApiResponse(code = 404, message = "Tenant or Namespace or Topic doesn't exist; or Schema is not found for this topic"),
+            @ApiResponse(code = 412, message = "Failed to find the ownership for the topic"),
+            @ApiResponse(code = 500, message = "Internal Server Error"),
+    })
+    public void getAllSchemas(
+            @PathParam("tenant") String tenant,
+            @PathParam("namespace") String namespace,
+            @PathParam("topic") String topic,
+            @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
+            @Suspended final AsyncResponse response
+    ) {
+        validateDestinationAndAdminOperation(tenant, namespace, topic, authoritative);
+
+        String schemaId = buildSchemaId(tenant, namespace, topic);
+        pulsar().getSchemaRegistryService().trimDeletedSchemaAndGetList(schemaId)
+                .handle((schema, error) -> {
+                    handleGetAllSchemasResponse(response, schema, error);
+                    return null;
+                });
+    }
+
     private static void handleGetSchemaResponse(AsyncResponse response,
                                                 SchemaAndMetadata schema, Throwable error) {
         if (isNull(error)) {
@@ -167,13 +205,7 @@ public class SchemasResource extends AdminResource {
                 response.resume(
                     Response.ok()
                         .encoding(MediaType.APPLICATION_JSON)
-                        .entity(GetSchemaResponse.builder()
-                            .version(getLongSchemaVersion(schema.version))
-                            .type(schema.schema.getType())
-                            .timestamp(schema.schema.getTimestamp())
-                            .data(new String(schema.schema.getData(), UTF_8))
-                            .properties(schema.schema.getProps())
-                            .build()
+                        .entity(convertSchemaAndMetadataToGetSchemaResponse(schema)
                         ).build()
                 );
             }
@@ -183,6 +215,27 @@ public class SchemasResource extends AdminResource {
 
     }
 
+    private static void handleGetAllSchemasResponse(AsyncResponse response,
+                                                    List<SchemaAndMetadata> schemas, Throwable error) {
+        if (isNull(error)) {
+            if (isNull(schemas)) {
+                response.resume(Response.status(Response.Status.NOT_FOUND).build());
+            } else {
+                response.resume(
+                        Response.ok()
+                                .encoding(MediaType.APPLICATION_JSON)
+                                .entity(GetAllVersionsSchemaResponse.builder().getSchemaResponses(
+                                        schemas.stream().map(SchemasResource::convertSchemaAndMetadataToGetSchemaResponse)
+                                                .collect(Collectors.toList())
+                                        ).build()
+                                ).build()
+                );
+            }
+        } else {
+            response.resume(error);
+        }
+    }
+
     @DELETE
     @Path("/{tenant}/{namespace}/{topic}/schema")
     @Produces(MediaType.APPLICATION_JSON)
@@ -193,6 +246,7 @@ public class SchemasResource extends AdminResource {
         @ApiResponse(code = 403, message = "Client is not authenticated"),
         @ApiResponse(code = 404, message = "Tenant or Namespace or Topic doesn't exist"),
         @ApiResponse(code = 412, message = "Failed to find the ownership for the topic"),
+        @ApiResponse(code = 500, message = "Internal Server Error"),
     })
     public void deleteSchema(
         @PathParam("tenant") String tenant,
@@ -234,6 +288,7 @@ public class SchemasResource extends AdminResource {
         @ApiResponse(code = 409, message = "Incompatible schema"),
         @ApiResponse(code = 412, message = "Failed to find the ownership for the topic"),
         @ApiResponse(code = 422, message = "Invalid schema data"),
+        @ApiResponse(code = 500, message = "Internal Server Error"),
     })
     public void postSchema(
         @PathParam("tenant") String tenant,
@@ -291,6 +346,130 @@ public class SchemasResource extends AdminResource {
         });
     }
 
+    @POST
+    @Path("/{tenant}/{namespace}/{topic}/compatibility")
+    @Produces(MediaType.APPLICATION_JSON)
+    @Consumes(MediaType.APPLICATION_JSON)
+    @ApiOperation(value = "test the schema compatibility", response = IsCompatibilityResponse.class)
+    @ApiResponses(value = {
+            @ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"),
+            @ApiResponse(code = 401, message = "Client is not authorized or Don't have admin permission"),
+            @ApiResponse(code = 403, message = "Client is not authenticated"),
+            @ApiResponse(code = 404, message = "Tenant or Namespace or Topic doesn't exist"),
+            @ApiResponse(code = 412, message = "Failed to find the ownership for the topic"),
+            @ApiResponse(code = 500, message = "Internal Server Error"),
+    })
+    public void testCompatibility(
+            @PathParam("tenant") String tenant,
+            @PathParam("namespace") String namespace,
+            @PathParam("topic") String topic,
+            @ApiParam(
+                    value = "A JSON value presenting a schema playload. An example of the expected schema can be found down"
+                            + " here.",
+                    examples = @Example(
+                            value = @ExampleProperty(
+                                    mediaType = MediaType.APPLICATION_JSON,
+                                    value = "{\"type\": \"STRING\", \"schema\": \"\", \"properties\": { \"key1\" : \"value1\" + } }"
+                            )
+                    )
+            )
+                    PostSchemaPayload payload,
+            @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
+            @Suspended final AsyncResponse response
+    ) {
+        validateDestinationAndAdminOperation(tenant, namespace, topic, authoritative);
+
+        String schemaId = buildSchemaId(tenant, namespace, topic);
+
+        SchemaCompatibilityStrategy schemaCompatibilityStrategy = SchemaCompatibilityStrategy
+                .fromAutoUpdatePolicy(getNamespacePolicies(NamespaceName.get(tenant, namespace))
+                        .schema_auto_update_compatibility_strategy);
+
+        pulsar().getSchemaRegistryService().isCompatible(schemaId, SchemaData.builder()
+                        .data(payload.getSchema().getBytes(Charsets.UTF_8))
+                        .isDeleted(false)
+                        .timestamp(clock.millis())
+                        .type(SchemaType.valueOf(payload.getType()))
+                        .user(defaultIfEmpty(clientAppId(), ""))
+                        .props(payload.getProperties())
+                        .build(),
+                schemaCompatibilityStrategy).thenAccept(isCompatible -> response.resume(
+                                        Response.accepted().entity(
+                                                IsCompatibilityResponse
+                                                .builder()
+                                                .isCompatibility(isCompatible)
+                                                .schemaCompatibilityStrategy(schemaCompatibilityStrategy.name())
+                                                .build()
+                                        ).build())).exceptionally(error -> {
+                response.resume(Response.serverError().build());
+                return null;
+        });
+    }
+
+    @POST
+    @Path("/{tenant}/{namespace}/{topic}/version")
+    @Produces(MediaType.APPLICATION_JSON)
+    @Consumes(MediaType.APPLICATION_JSON)
+    @ApiOperation(value = "get the version of the schema", response = LongSchemaVersion.class)
+    @ApiResponses(value = {
+            @ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"),
+            @ApiResponse(code = 401, message = "Client is not authorized or Don't have admin permission"),
+            @ApiResponse(code = 403, message = "Client is not authenticated"),
+            @ApiResponse(code = 404, message = "Tenant or Namespace or Topic doesn't exist"),
+            @ApiResponse(code = 412, message = "Failed to find the ownership for the topic"),
+            @ApiResponse(code = 422, message = "Invalid schema data"),
+            @ApiResponse(code = 500, message = "Internal Server Error"),
+    })
+    public void getVersionBySchema(
+            @PathParam("tenant") String tenant,
+            @PathParam("namespace") String namespace,
+            @PathParam("topic") String topic,
+            @ApiParam(
+                    value = "A JSON value presenting a schema playload. An example of the expected schema can be found down"
+                            + " here.",
+                    examples = @Example(
+                            value = @ExampleProperty(
+                                    mediaType = MediaType.APPLICATION_JSON,
+                                    value = "{\"type\": \"STRING\", \"schema\": \"\", \"properties\": { \"key1\" : \"value1\" + } }"
+                            )
+                    )
+            )
+                    PostSchemaPayload payload,
+            @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
+            @Suspended final AsyncResponse response
+    ) {
+        validateDestinationAndAdminOperation(tenant, namespace, topic, authoritative);
+
+        String schemaId = buildSchemaId(tenant, namespace, topic);
+
+        pulsar().getSchemaRegistryService().findSchemaVersion(schemaId, SchemaData.builder()
+                .data(payload.getSchema().getBytes(Charsets.UTF_8))
+                .isDeleted(false)
+                .timestamp(clock.millis())
+                .type(SchemaType.valueOf(payload.getType()))
+                .user(defaultIfEmpty(clientAppId(), ""))
+                .props(payload.getProperties())
+                .build()).thenAccept(version ->
+                        response.resume(
+                                Response.accepted()
+                                        .entity(LongSchemaVersionResponse.builder()
+                                                .version(version).build()).build()))
+                .exceptionally(error -> {
+                                    response.resume(Response.serverError().build());
+                                    return null;
+        });
+    }
+
+    private static GetSchemaResponse convertSchemaAndMetadataToGetSchemaResponse(SchemaAndMetadata schemaAndMetadata) {
+        return GetSchemaResponse.builder()
+                .version(getLongSchemaVersion(schemaAndMetadata.version))
+                .type(schemaAndMetadata.schema.getType())
+                .timestamp(schemaAndMetadata.schema.getTimestamp())
+                .data(new String(schemaAndMetadata.schema.getData(), UTF_8))
+                .properties(schemaAndMetadata.schema.getProps())
+                .build();
+    }
+
     private String buildSchemaId(String tenant, String namespace, String topic) {
         TopicName topicName = TopicName.get("persistent", tenant, namespace, topic);
         if (topicName.isPartitioned()) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/DefaultSchemaRegistryService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/DefaultSchemaRegistryService.java
index fe63b34..53bb7e9 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/DefaultSchemaRegistryService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/DefaultSchemaRegistryService.java
@@ -42,6 +42,16 @@ public class DefaultSchemaRegistryService implements SchemaRegistryService {
         return completedFuture(Collections.emptyList());
     }
 
+    @Override
+    public CompletableFuture<List<SchemaAndMetadata>> trimDeletedSchemaAndGetList(String schemaId) {
+        return completedFuture(Collections.emptyList());
+    }
+
+    @Override
+    public CompletableFuture<Long> findSchemaVersion(String schemaId, SchemaData schemaData) {
+        return completedFuture(NO_SCHEMA_VERSION);
+    }
+
 
     @Override
     public CompletableFuture<SchemaVersion> putSchemaIfAbsent(String schemaId, SchemaData schema,
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistry.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistry.java
index 9b9f591..515600f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistry.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistry.java
@@ -42,6 +42,10 @@ public interface SchemaRegistry extends AutoCloseable {
     CompletableFuture<Boolean> isCompatible(String schemaId, SchemaData schema,
                                                              SchemaCompatibilityStrategy strategy);
 
+    CompletableFuture<List<SchemaAndMetadata>> trimDeletedSchemaAndGetList(String schemaId);
+
+    CompletableFuture<Long> findSchemaVersion(String schemaId, SchemaData schemaData);
+
     SchemaVersion versionFromBytes(byte[] version);
 
     class SchemaAndMetadata {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryService.java
index 73244cf..d031ca6 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryService.java
@@ -32,6 +32,7 @@ import org.slf4j.LoggerFactory;
 public interface SchemaRegistryService extends SchemaRegistry {
     String CreateMethodName = "create";
     Logger log = LoggerFactory.getLogger(SchemaRegistryService.class);
+    long NO_SCHEMA_VERSION = -1L;
 
     static Map<SchemaType, SchemaCompatibilityCheck> getCheckers(Set<String> checkerClasses) throws Exception {
         Map<SchemaType, SchemaCompatibilityCheck> checkers = Maps.newHashMap();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
index e2975a1..c930385 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
@@ -34,6 +34,7 @@ import com.google.protobuf.ByteString;
 import com.google.protobuf.InvalidProtocolBufferException;
 import java.time.Clock;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -220,6 +221,19 @@ public class SchemaRegistryServiceImpl implements SchemaRegistryService {
             .isCompatible(existingSchema.schema, newSchema, strategy);
     }
 
+    public CompletableFuture<Long> findSchemaVersion(String schemaId, SchemaData schemaData) {
+        return trimDeletedSchemaAndGetList(schemaId).thenCompose(schemaAndMetadataList -> {
+            HashCode newHash = hashFunction.hashBytes(schemaData.getData());
+            for (SchemaAndMetadata schemaAndMetadata:schemaAndMetadataList) {
+                if (Arrays.equals(hashFunction.hashBytes(schemaAndMetadata.schema.getData()).asBytes(), newHash.asBytes())) {
+                    return completedFuture(((LongSchemaVersion)schemaStorage
+                            .versionFromBytes(schemaAndMetadata.version.bytes())).getVersion());
+                }
+            }
+            return completedFuture(NO_SCHEMA_VERSION);
+        });
+    }
+
     private CompletableFuture<Boolean> checkCompatibilityWithLatest(String schemaId, SchemaData schema,
                                                                     SchemaCompatibilityStrategy strategy) {
         return getSchema(schemaId)
@@ -241,7 +255,7 @@ public class SchemaRegistryServiceImpl implements SchemaRegistryService {
             );
     }
 
-    private CompletableFuture<List<SchemaAndMetadata>> trimDeletedSchemaAndGetList(String schemaId) {
+    public CompletableFuture<List<SchemaAndMetadata>> trimDeletedSchemaAndGetList(String schemaId) {
         return getAllSchemas(schemaId).thenCompose(FutureUtils::collect).thenApply(list -> {
             // Trim the prefix of schemas before the latest delete.
             int lastIndex = list.size() - 1;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/SchemaRegistryServiceWithSchemaDataValidator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/SchemaRegistryServiceWithSchemaDataValidator.java
index f0ca3fc..c6c8b77 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/SchemaRegistryServiceWithSchemaDataValidator.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/SchemaRegistryServiceWithSchemaDataValidator.java
@@ -63,6 +63,16 @@ public class SchemaRegistryServiceWithSchemaDataValidator implements SchemaRegis
     }
 
     @Override
+    public CompletableFuture<List<SchemaAndMetadata>> trimDeletedSchemaAndGetList(String schemaId) {
+        return this.service.trimDeletedSchemaAndGetList(schemaId);
+    }
+
+    @Override
+    public CompletableFuture<Long> findSchemaVersion(String schemaId, SchemaData schemaData) {
+        return this.service.findSchemaVersion(schemaId, schemaData);
+    }
+
+    @Override
     public CompletableFuture<SchemaVersion> putSchemaIfAbsent(String schemaId,
                                                               SchemaData schema,
                                                               SchemaCompatibilityStrategy strategy) {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/SchemaServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/SchemaServiceTest.java
index d998fdd..0c2a174 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/SchemaServiceTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/SchemaServiceTest.java
@@ -35,7 +35,10 @@ import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 
+import com.google.common.hash.HashFunction;
+import com.google.common.hash.Hashing;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.broker.service.schema.SchemaRegistry.SchemaAndMetadata;
 import org.apache.pulsar.common.protocol.schema.SchemaData;
 import org.apache.pulsar.common.schema.SchemaType;
 import org.apache.pulsar.common.protocol.schema.SchemaVersion;
@@ -111,6 +114,12 @@ public class SchemaServiceTest extends MockedPulsarServiceBaseTest {
     }
 
     @Test
+    public void findSchemaVersionTest() throws Exception {
+        putSchema(schemaId1, schema1, version(0));
+        assertEquals(0, schemaRegistryService.findSchemaVersion(schemaId1, schema1).get().longValue());
+    }
+
+    @Test
     public void deleteSchemaAndAddSchema() throws Exception {
         putSchema(schemaId1, schema1, version(0));
         SchemaData latest = getLatestSchema(schemaId1, version(0));
@@ -243,6 +252,32 @@ public class SchemaServiceTest extends MockedPulsarServiceBaseTest {
         putSchema(schemaId1, schema1, version(0));
     }
 
+
+    @Test
+    public void trimDeletedSchemaAndGetListTest() throws Exception {
+        List<SchemaAndMetadata> list = new ArrayList<>();
+        CompletableFuture<SchemaVersion> put = schemaRegistryService.putSchemaIfAbsent(
+                schemaId1, schema1, SchemaCompatibilityStrategy.FULL);
+        SchemaVersion newVersion = put.get();
+        list.add(new SchemaAndMetadata(schemaId1, schema1, newVersion));
+        put = schemaRegistryService.putSchemaIfAbsent(
+                schemaId1, schema2, SchemaCompatibilityStrategy.FULL);
+        newVersion = put.get();
+        list.add(new SchemaAndMetadata(schemaId1, schema2, newVersion));
+        List<SchemaAndMetadata> list1 = schemaRegistryService.trimDeletedSchemaAndGetList(schemaId1).get();
+        assertEquals(list.size(), list1.size());
+        HashFunction hashFunction = Hashing.sha256();
+        for (int i = 0; i < list.size(); i++) {
+            SchemaAndMetadata schemaAndMetadata1 = list.get(i);
+            SchemaAndMetadata schemaAndMetadata2 = list1.get(i);
+            assertEquals(hashFunction.hashBytes(schemaAndMetadata1.schema.getData()).asBytes(),
+                    hashFunction.hashBytes(schemaAndMetadata2.schema.getData()).asBytes());
+            assertEquals(((LongSchemaVersion)schemaAndMetadata1.version).getVersion()
+                    , ((LongSchemaVersion)schemaAndMetadata2.version).getVersion());
+            assertEquals(schemaAndMetadata1.id, schemaAndMetadata2.id);
+        }
+    }
+
     @Test
     public void dontReAddExistingSchemaInMiddle() throws Exception {
         putSchema(schemaId1, randomSchema(), version(0));
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Schemas.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Schemas.java
index 9be3c63..fa7dcca 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Schemas.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Schemas.java
@@ -18,9 +18,15 @@
  */
 package org.apache.pulsar.client.admin;
 
+import org.apache.pulsar.common.protocol.schema.GetAllVersionsSchemaResponse;
+import org.apache.pulsar.common.protocol.schema.IsCompatibilityResponse;
 import org.apache.pulsar.common.protocol.schema.PostSchemaPayload;
+import org.apache.pulsar.common.protocol.schema.PostSchemaResponse;
+import org.apache.pulsar.common.protocol.schema.SchemaVersion;
 import org.apache.pulsar.common.schema.SchemaInfo;
 
+import java.util.List;
+
 /**
  * Admin interface on interacting with schemas.
  */
@@ -71,4 +77,48 @@ public interface Schemas {
      */
     void createSchema(String topic, PostSchemaPayload schemaPayload) throws PulsarAdminException;
 
+    /**
+     * Judge schema compatibility <tt>topic</tt>.
+     *
+     * @param topic topic name, in fully qualified format
+     * @param schemaPayload schema payload
+     * @throws PulsarAdminException
+     */
+    IsCompatibilityResponse testCompatibility(String topic, PostSchemaPayload schemaPayload) throws PulsarAdminException;
+
+    /**
+     * Find schema version <tt>topic</tt>.
+     *
+     * @param topic topic name, in fully qualified format
+     * @param schemaPayload schema payload
+     * @throws PulsarAdminException
+     */
+    Long getVersionBySchema(String topic, PostSchemaPayload schemaPayload) throws PulsarAdminException;
+
+    /**
+     * Judge schema compatibility <tt>topic</tt>.
+     *
+     * @param topic topic name, in fully qualified format
+     * @param schemaInfo schema info
+     * @throws PulsarAdminException
+     */
+    IsCompatibilityResponse testCompatibility(String topic, SchemaInfo schemaInfo) throws PulsarAdminException;
+
+    /**
+     * Find schema version <tt>topic</tt>.
+     *
+     * @param topic topic name, in fully qualified format
+     * @param schemaInfo schema info
+     * @throws PulsarAdminException
+     */
+    Long getVersionBySchema(String topic, SchemaInfo schemaInfo) throws PulsarAdminException;
+
+    /**
+     * Get all version schemas <tt>topic</tt>.
+     *
+     * @param topic topic name, in fully qualified format
+     * @throws PulsarAdminException
+     */
+    List<SchemaInfo> getAllSchemas(String topic) throws PulsarAdminException;
+
 }
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SchemasImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SchemasImpl.java
index cce5cd2..868a807 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SchemasImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SchemasImpl.java
@@ -28,10 +28,16 @@ import org.apache.pulsar.client.api.Authentication;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.ErrorData;
 import org.apache.pulsar.common.protocol.schema.DeleteSchemaResponse;
+import org.apache.pulsar.common.protocol.schema.GetAllVersionsSchemaResponse;
 import org.apache.pulsar.common.protocol.schema.GetSchemaResponse;
+import org.apache.pulsar.common.protocol.schema.IsCompatibilityResponse;
+import org.apache.pulsar.common.protocol.schema.LongSchemaVersionResponse;
 import org.apache.pulsar.common.protocol.schema.PostSchemaPayload;
 import org.apache.pulsar.common.schema.SchemaInfo;
 
+import java.util.List;
+import java.util.stream.Collectors;
+
 public class SchemasImpl extends BaseResource implements Schemas {
 
     private final WebTarget target;
@@ -76,14 +82,8 @@ public class SchemasImpl extends BaseResource implements Schemas {
 
     @Override
     public void createSchema(String topic, SchemaInfo schemaInfo) throws PulsarAdminException {
-        PostSchemaPayload payload = new PostSchemaPayload();
-        payload.setType(schemaInfo.getType().name());
-        payload.setProperties(schemaInfo.getProperties());
-        // for backward compatibility concern, we convert `bytes` to `string`
-        // we can consider fixing it in a new version of rest endpoint
-        payload.setSchema(convertSchemaDataToStringLegacy(schemaInfo.getSchema()));
 
-        createSchema(topic, payload);
+        createSchema(topic, convertSchemaInfoToPostSchemaPayload(schemaInfo));
     }
 
     @Override
@@ -97,6 +97,60 @@ public class SchemasImpl extends BaseResource implements Schemas {
         }
     }
 
+    @Override
+    public IsCompatibilityResponse testCompatibility(String topic, PostSchemaPayload payload) throws PulsarAdminException {
+        try {
+            TopicName tn = TopicName.get(topic);
+            return request(compatibilityPath(tn)).post(Entity.json(payload), IsCompatibilityResponse.class);
+        } catch (Exception e) {
+            throw getApiException(e);
+        }
+    }
+
+    @Override
+    public Long getVersionBySchema(String topic, PostSchemaPayload payload) throws PulsarAdminException {
+        try {
+            return request(versionPath(TopicName.get(topic))).post(Entity.json(payload), LongSchemaVersionResponse.class).getVersion();
+        } catch (Exception e) {
+            throw getApiException(e);
+        }
+    }
+
+    @Override
+    public IsCompatibilityResponse testCompatibility(String topic, SchemaInfo schemaInfo) throws PulsarAdminException {
+        try {
+            return request(compatibilityPath(TopicName.get(topic)))
+                    .post(Entity.json(convertSchemaInfoToPostSchemaPayload(schemaInfo)), IsCompatibilityResponse.class);
+        } catch (Exception e) {
+            throw getApiException(e);
+        }
+    }
+
+    @Override
+    public Long getVersionBySchema(String topic, SchemaInfo schemaInfo) throws PulsarAdminException {
+        try {
+            return request(versionPath(TopicName.get(topic)))
+                    .post(Entity.json(convertSchemaInfoToPostSchemaPayload(schemaInfo)), LongSchemaVersionResponse.class).getVersion();
+        } catch (Exception e) {
+            throw getApiException(e);
+        }
+    }
+
+    @Override
+    public List<SchemaInfo> getAllSchemas(String topic) throws PulsarAdminException {
+        try {
+            TopicName topicName = TopicName.get(topic);
+            return request(schemasPath(TopicName.get(topic)))
+                    .get(GetAllVersionsSchemaResponse.class)
+                    .getGetSchemaResponses().stream()
+                    .map(getSchemaResponse -> convertGetSchemaResponseToSchemaInfo(topicName, getSchemaResponse))
+                    .collect(Collectors.toList());
+
+        } catch (Exception e) {
+            throw getApiException(e);
+        }
+    }
+
     private WebTarget schemaPath(TopicName topicName) {
         return target
             .path(topicName.getTenant())
@@ -105,6 +159,30 @@ public class SchemasImpl extends BaseResource implements Schemas {
             .path("schema");
     }
 
+    private WebTarget versionPath(TopicName topicName) {
+        return target
+                .path(topicName.getTenant())
+                .path(topicName.getNamespacePortion())
+                .path(topicName.getEncodedLocalName())
+                .path("version");
+    }
+
+    private WebTarget schemasPath(TopicName topicName) {
+        return target
+                .path(topicName.getTenant())
+                .path(topicName.getNamespacePortion())
+                .path(topicName.getEncodedLocalName())
+                .path("schemas");
+    }
+
+    private WebTarget compatibilityPath(TopicName topicName) {
+        return target
+                .path(topicName.getTenant())
+                .path(topicName.getNamespacePortion())
+                .path(topicName.getEncodedLocalName())
+                .path("compatibility");
+    }
+
     // the util function converts `GetSchemaResponse` to `SchemaInfo`
     static SchemaInfo convertGetSchemaResponseToSchemaInfo(TopicName tn,
                                                            GetSchemaResponse response) {
@@ -125,4 +203,14 @@ public class SchemasImpl extends BaseResource implements Schemas {
 
         return new String(schemaData, UTF_8);
     }
+
+    static PostSchemaPayload convertSchemaInfoToPostSchemaPayload(SchemaInfo schemaInfo) {
+        PostSchemaPayload payload = new PostSchemaPayload();
+        payload.setType(schemaInfo.getType().name());
+        payload.setProperties(schemaInfo.getProperties());
+        // for backward compatibility concern, we convert `bytes` to `string`
+        // we can consider fixing it in a new version of rest endpoint
+        payload.setSchema(convertSchemaDataToStringLegacy(schemaInfo.getSchema()));
+        return payload;
+    }
 }
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/GetAllVersionsSchemaResponse.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/GetAllVersionsSchemaResponse.java
new file mode 100644
index 0000000..e65d50f
--- /dev/null
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/GetAllVersionsSchemaResponse.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.protocol.schema;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.util.List;
+
+@Data
+@Builder
+@AllArgsConstructor
+@NoArgsConstructor
+public class GetAllVersionsSchemaResponse {
+    private List<GetSchemaResponse> getSchemaResponses;
+}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/IsCompatibilityResponse.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/IsCompatibilityResponse.java
new file mode 100644
index 0000000..26ae2ef
--- /dev/null
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/IsCompatibilityResponse.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.protocol.schema;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+@Data
+@Builder
+@AllArgsConstructor
+@NoArgsConstructor
+public class IsCompatibilityResponse {
+    boolean isCompatibility;
+    String schemaCompatibilityStrategy;
+
+}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/LongSchemaVersionResponse.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/LongSchemaVersionResponse.java
new file mode 100644
index 0000000..be55083
--- /dev/null
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/LongSchemaVersionResponse.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.protocol.schema;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+@Data
+@Builder
+@AllArgsConstructor
+@NoArgsConstructor
+public class LongSchemaVersionResponse {
+    Long version;
+}