You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2019/07/30 16:47:25 UTC
[pulsar] branch master updated: Add the schema admin api (#4800)
This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 8d19575 Add the schema admin api (#4800)
8d19575 is described below
commit 8d19575358c5fbf6d777ff922e8d205bd3254a49
Author: congbo <39...@users.noreply.github.com>
AuthorDate: Wed Jul 31 00:47:17 2019 +0800
Add the schema admin api (#4800)
### Motivation
Continue the issues of #4782
### Verifying this change
Add the tests for it
---
.../pulsar/broker/admin/v2/SchemasResource.java | 193 ++++++++++++++++++++-
.../schema/DefaultSchemaRegistryService.java | 10 ++
.../broker/service/schema/SchemaRegistry.java | 4 +
.../service/schema/SchemaRegistryService.java | 1 +
.../service/schema/SchemaRegistryServiceImpl.java | 16 +-
...hemaRegistryServiceWithSchemaDataValidator.java | 10 ++
.../broker/service/schema/SchemaServiceTest.java | 35 ++++
.../org/apache/pulsar/client/admin/Schemas.java | 50 ++++++
.../pulsar/client/admin/internal/SchemasImpl.java | 102 ++++++++++-
.../schema/GetAllVersionsSchemaResponse.java | 34 ++++
.../protocol/schema/IsCompatibilityResponse.java | 34 ++++
.../protocol/schema/LongSchemaVersionResponse.java | 32 ++++
12 files changed, 506 insertions(+), 15 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/SchemasResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/SchemasResource.java
index 1f61d4a..0e68972 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/SchemasResource.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/SchemasResource.java
@@ -34,6 +34,8 @@ import io.swagger.annotations.Example;
import io.swagger.annotations.ExampleProperty;
import java.nio.ByteBuffer;
import java.time.Clock;
+import java.util.List;
+import java.util.stream.Collectors;
import javax.ws.rs.Consumes;
import javax.ws.rs.DELETE;
import javax.ws.rs.DefaultValue;
@@ -48,6 +50,7 @@ import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.container.Suspended;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
+
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException;
import org.apache.pulsar.broker.service.schema.LongSchemaVersion;
@@ -55,9 +58,13 @@ import org.apache.pulsar.broker.service.schema.SchemaCompatibilityStrategy;
import org.apache.pulsar.broker.service.schema.SchemaRegistry.SchemaAndMetadata;
import org.apache.pulsar.broker.service.schema.exceptions.InvalidSchemaDataException;
import org.apache.pulsar.broker.web.RestException;
+import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.protocol.schema.DeleteSchemaResponse;
+import org.apache.pulsar.common.protocol.schema.GetAllVersionsSchemaResponse;
import org.apache.pulsar.common.protocol.schema.GetSchemaResponse;
+import org.apache.pulsar.common.protocol.schema.IsCompatibilityResponse;
+import org.apache.pulsar.common.protocol.schema.LongSchemaVersionResponse;
import org.apache.pulsar.common.protocol.schema.PostSchemaPayload;
import org.apache.pulsar.common.protocol.schema.PostSchemaResponse;
import org.apache.pulsar.common.protocol.schema.SchemaData;
@@ -106,6 +113,7 @@ public class SchemasResource extends AdminResource {
@ApiResponse(code = 403, message = "Client is not authenticated"),
@ApiResponse(code = 404, message = "Tenant or Namespace or Topic doesn't exist; or Schema is not found for this topic"),
@ApiResponse(code = 412, message = "Failed to find the ownership for the topic"),
+ @ApiResponse(code = 500, message = "Internal Server Error"),
})
public void getSchema(
@PathParam("tenant") String tenant,
@@ -134,6 +142,7 @@ public class SchemasResource extends AdminResource {
@ApiResponse(code = 403, message = "Client is not authenticated"),
@ApiResponse(code = 404, message = "Tenant or Namespace or Topic doesn't exist; or Schema is not found for this topic"),
@ApiResponse(code = 412, message = "Failed to find the ownership for the topic"),
+ @ApiResponse(code = 500, message = "Internal Server Error"),
})
public void getSchema(
@PathParam("tenant") String tenant,
@@ -156,6 +165,35 @@ public class SchemasResource extends AdminResource {
});
}
+ @GET
+ @Path("/{tenant}/{namespace}/{topic}/schemas")
+ @Produces(MediaType.APPLICATION_JSON)
+ @ApiOperation(value = "Get the all schemas of a topic", response = GetAllVersionsSchemaResponse.class)
+ @ApiResponses(value = {
+ @ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"),
+ @ApiResponse(code = 401, message = "Client is not authorized or Don't have admin permission"),
+ @ApiResponse(code = 403, message = "Client is not authenticated"),
+ @ApiResponse(code = 404, message = "Tenant or Namespace or Topic doesn't exist; or Schema is not found for this topic"),
+ @ApiResponse(code = 412, message = "Failed to find the ownership for the topic"),
+ @ApiResponse(code = 500, message = "Internal Server Error"),
+ })
+ public void getAllSchemas(
+ @PathParam("tenant") String tenant,
+ @PathParam("namespace") String namespace,
+ @PathParam("topic") String topic,
+ @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
+ @Suspended final AsyncResponse response
+ ) {
+ validateDestinationAndAdminOperation(tenant, namespace, topic, authoritative);
+
+ String schemaId = buildSchemaId(tenant, namespace, topic);
+ pulsar().getSchemaRegistryService().trimDeletedSchemaAndGetList(schemaId)
+ .handle((schema, error) -> {
+ handleGetAllSchemasResponse(response, schema, error);
+ return null;
+ });
+ }
+
private static void handleGetSchemaResponse(AsyncResponse response,
SchemaAndMetadata schema, Throwable error) {
if (isNull(error)) {
@@ -167,13 +205,7 @@ public class SchemasResource extends AdminResource {
response.resume(
Response.ok()
.encoding(MediaType.APPLICATION_JSON)
- .entity(GetSchemaResponse.builder()
- .version(getLongSchemaVersion(schema.version))
- .type(schema.schema.getType())
- .timestamp(schema.schema.getTimestamp())
- .data(new String(schema.schema.getData(), UTF_8))
- .properties(schema.schema.getProps())
- .build()
+ .entity(convertSchemaAndMetadataToGetSchemaResponse(schema)
).build()
);
}
@@ -183,6 +215,27 @@ public class SchemasResource extends AdminResource {
}
+ private static void handleGetAllSchemasResponse(AsyncResponse response,
+ List<SchemaAndMetadata> schemas, Throwable error) {
+ if (isNull(error)) {
+ if (isNull(schemas)) {
+ response.resume(Response.status(Response.Status.NOT_FOUND).build());
+ } else {
+ response.resume(
+ Response.ok()
+ .encoding(MediaType.APPLICATION_JSON)
+ .entity(GetAllVersionsSchemaResponse.builder().getSchemaResponses(
+ schemas.stream().map(SchemasResource::convertSchemaAndMetadataToGetSchemaResponse)
+ .collect(Collectors.toList())
+ ).build()
+ ).build()
+ );
+ }
+ } else {
+ response.resume(error);
+ }
+ }
+
@DELETE
@Path("/{tenant}/{namespace}/{topic}/schema")
@Produces(MediaType.APPLICATION_JSON)
@@ -193,6 +246,7 @@ public class SchemasResource extends AdminResource {
@ApiResponse(code = 403, message = "Client is not authenticated"),
@ApiResponse(code = 404, message = "Tenant or Namespace or Topic doesn't exist"),
@ApiResponse(code = 412, message = "Failed to find the ownership for the topic"),
+ @ApiResponse(code = 500, message = "Internal Server Error"),
})
public void deleteSchema(
@PathParam("tenant") String tenant,
@@ -234,6 +288,7 @@ public class SchemasResource extends AdminResource {
@ApiResponse(code = 409, message = "Incompatible schema"),
@ApiResponse(code = 412, message = "Failed to find the ownership for the topic"),
@ApiResponse(code = 422, message = "Invalid schema data"),
+ @ApiResponse(code = 500, message = "Internal Server Error"),
})
public void postSchema(
@PathParam("tenant") String tenant,
@@ -291,6 +346,130 @@ public class SchemasResource extends AdminResource {
});
}
+ @POST
+ @Path("/{tenant}/{namespace}/{topic}/compatibility")
+ @Produces(MediaType.APPLICATION_JSON)
+ @Consumes(MediaType.APPLICATION_JSON)
+ @ApiOperation(value = "test the schema compatibility", response = IsCompatibilityResponse.class)
+ @ApiResponses(value = {
+ @ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"),
+ @ApiResponse(code = 401, message = "Client is not authorized or Don't have admin permission"),
+ @ApiResponse(code = 403, message = "Client is not authenticated"),
+ @ApiResponse(code = 404, message = "Tenant or Namespace or Topic doesn't exist"),
+ @ApiResponse(code = 412, message = "Failed to find the ownership for the topic"),
+ @ApiResponse(code = 500, message = "Internal Server Error"),
+ })
+ public void testCompatibility(
+ @PathParam("tenant") String tenant,
+ @PathParam("namespace") String namespace,
+ @PathParam("topic") String topic,
+ @ApiParam(
+ value = "A JSON value presenting a schema playload. An example of the expected schema can be found down"
+ + " here.",
+ examples = @Example(
+ value = @ExampleProperty(
+ mediaType = MediaType.APPLICATION_JSON,
+ value = "{\"type\": \"STRING\", \"schema\": \"\", \"properties\": { \"key1\" : \"value1\" + } }"
+ )
+ )
+ )
+ PostSchemaPayload payload,
+ @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
+ @Suspended final AsyncResponse response
+ ) {
+ validateDestinationAndAdminOperation(tenant, namespace, topic, authoritative);
+
+ String schemaId = buildSchemaId(tenant, namespace, topic);
+
+ SchemaCompatibilityStrategy schemaCompatibilityStrategy = SchemaCompatibilityStrategy
+ .fromAutoUpdatePolicy(getNamespacePolicies(NamespaceName.get(tenant, namespace))
+ .schema_auto_update_compatibility_strategy);
+
+ pulsar().getSchemaRegistryService().isCompatible(schemaId, SchemaData.builder()
+ .data(payload.getSchema().getBytes(Charsets.UTF_8))
+ .isDeleted(false)
+ .timestamp(clock.millis())
+ .type(SchemaType.valueOf(payload.getType()))
+ .user(defaultIfEmpty(clientAppId(), ""))
+ .props(payload.getProperties())
+ .build(),
+ schemaCompatibilityStrategy).thenAccept(isCompatible -> response.resume(
+ Response.accepted().entity(
+ IsCompatibilityResponse
+ .builder()
+ .isCompatibility(isCompatible)
+ .schemaCompatibilityStrategy(schemaCompatibilityStrategy.name())
+ .build()
+ ).build())).exceptionally(error -> {
+ response.resume(Response.serverError().build());
+ return null;
+ });
+ }
+
+ @POST
+ @Path("/{tenant}/{namespace}/{topic}/version")
+ @Produces(MediaType.APPLICATION_JSON)
+ @Consumes(MediaType.APPLICATION_JSON)
+ @ApiOperation(value = "get the version of the schema", response = LongSchemaVersion.class)
+ @ApiResponses(value = {
+ @ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"),
+ @ApiResponse(code = 401, message = "Client is not authorized or Don't have admin permission"),
+ @ApiResponse(code = 403, message = "Client is not authenticated"),
+ @ApiResponse(code = 404, message = "Tenant or Namespace or Topic doesn't exist"),
+ @ApiResponse(code = 412, message = "Failed to find the ownership for the topic"),
+ @ApiResponse(code = 422, message = "Invalid schema data"),
+ @ApiResponse(code = 500, message = "Internal Server Error"),
+ })
+ public void getVersionBySchema(
+ @PathParam("tenant") String tenant,
+ @PathParam("namespace") String namespace,
+ @PathParam("topic") String topic,
+ @ApiParam(
+ value = "A JSON value presenting a schema playload. An example of the expected schema can be found down"
+ + " here.",
+ examples = @Example(
+ value = @ExampleProperty(
+ mediaType = MediaType.APPLICATION_JSON,
+ value = "{\"type\": \"STRING\", \"schema\": \"\", \"properties\": { \"key1\" : \"value1\" + } }"
+ )
+ )
+ )
+ PostSchemaPayload payload,
+ @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
+ @Suspended final AsyncResponse response
+ ) {
+ validateDestinationAndAdminOperation(tenant, namespace, topic, authoritative);
+
+ String schemaId = buildSchemaId(tenant, namespace, topic);
+
+ pulsar().getSchemaRegistryService().findSchemaVersion(schemaId, SchemaData.builder()
+ .data(payload.getSchema().getBytes(Charsets.UTF_8))
+ .isDeleted(false)
+ .timestamp(clock.millis())
+ .type(SchemaType.valueOf(payload.getType()))
+ .user(defaultIfEmpty(clientAppId(), ""))
+ .props(payload.getProperties())
+ .build()).thenAccept(version ->
+ response.resume(
+ Response.accepted()
+ .entity(LongSchemaVersionResponse.builder()
+ .version(version).build()).build()))
+ .exceptionally(error -> {
+ response.resume(Response.serverError().build());
+ return null;
+ });
+ }
+
+ private static GetSchemaResponse convertSchemaAndMetadataToGetSchemaResponse(SchemaAndMetadata schemaAndMetadata) {
+ return GetSchemaResponse.builder()
+ .version(getLongSchemaVersion(schemaAndMetadata.version))
+ .type(schemaAndMetadata.schema.getType())
+ .timestamp(schemaAndMetadata.schema.getTimestamp())
+ .data(new String(schemaAndMetadata.schema.getData(), UTF_8))
+ .properties(schemaAndMetadata.schema.getProps())
+ .build();
+ }
+
private String buildSchemaId(String tenant, String namespace, String topic) {
TopicName topicName = TopicName.get("persistent", tenant, namespace, topic);
if (topicName.isPartitioned()) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/DefaultSchemaRegistryService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/DefaultSchemaRegistryService.java
index fe63b34..53bb7e9 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/DefaultSchemaRegistryService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/DefaultSchemaRegistryService.java
@@ -42,6 +42,16 @@ public class DefaultSchemaRegistryService implements SchemaRegistryService {
return completedFuture(Collections.emptyList());
}
+ @Override
+ public CompletableFuture<List<SchemaAndMetadata>> trimDeletedSchemaAndGetList(String schemaId) {
+ return completedFuture(Collections.emptyList());
+ }
+
+ @Override
+ public CompletableFuture<Long> findSchemaVersion(String schemaId, SchemaData schemaData) {
+ return completedFuture(NO_SCHEMA_VERSION);
+ }
+
@Override
public CompletableFuture<SchemaVersion> putSchemaIfAbsent(String schemaId, SchemaData schema,
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistry.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistry.java
index 9b9f591..515600f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistry.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistry.java
@@ -42,6 +42,10 @@ public interface SchemaRegistry extends AutoCloseable {
CompletableFuture<Boolean> isCompatible(String schemaId, SchemaData schema,
SchemaCompatibilityStrategy strategy);
+ CompletableFuture<List<SchemaAndMetadata>> trimDeletedSchemaAndGetList(String schemaId);
+
+ CompletableFuture<Long> findSchemaVersion(String schemaId, SchemaData schemaData);
+
SchemaVersion versionFromBytes(byte[] version);
class SchemaAndMetadata {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryService.java
index 73244cf..d031ca6 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryService.java
@@ -32,6 +32,7 @@ import org.slf4j.LoggerFactory;
public interface SchemaRegistryService extends SchemaRegistry {
String CreateMethodName = "create";
Logger log = LoggerFactory.getLogger(SchemaRegistryService.class);
+ long NO_SCHEMA_VERSION = -1L;
static Map<SchemaType, SchemaCompatibilityCheck> getCheckers(Set<String> checkerClasses) throws Exception {
Map<SchemaType, SchemaCompatibilityCheck> checkers = Maps.newHashMap();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
index e2975a1..c930385 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
@@ -34,6 +34,7 @@ import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import java.time.Clock;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -220,6 +221,19 @@ public class SchemaRegistryServiceImpl implements SchemaRegistryService {
.isCompatible(existingSchema.schema, newSchema, strategy);
}
+ public CompletableFuture<Long> findSchemaVersion(String schemaId, SchemaData schemaData) {
+ return trimDeletedSchemaAndGetList(schemaId).thenCompose(schemaAndMetadataList -> {
+ HashCode newHash = hashFunction.hashBytes(schemaData.getData());
+ for (SchemaAndMetadata schemaAndMetadata:schemaAndMetadataList) {
+ if (Arrays.equals(hashFunction.hashBytes(schemaAndMetadata.schema.getData()).asBytes(), newHash.asBytes())) {
+ return completedFuture(((LongSchemaVersion)schemaStorage
+ .versionFromBytes(schemaAndMetadata.version.bytes())).getVersion());
+ }
+ }
+ return completedFuture(NO_SCHEMA_VERSION);
+ });
+ }
+
private CompletableFuture<Boolean> checkCompatibilityWithLatest(String schemaId, SchemaData schema,
SchemaCompatibilityStrategy strategy) {
return getSchema(schemaId)
@@ -241,7 +255,7 @@ public class SchemaRegistryServiceImpl implements SchemaRegistryService {
);
}
- private CompletableFuture<List<SchemaAndMetadata>> trimDeletedSchemaAndGetList(String schemaId) {
+ public CompletableFuture<List<SchemaAndMetadata>> trimDeletedSchemaAndGetList(String schemaId) {
return getAllSchemas(schemaId).thenCompose(FutureUtils::collect).thenApply(list -> {
// Trim the prefix of schemas before the latest delete.
int lastIndex = list.size() - 1;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/SchemaRegistryServiceWithSchemaDataValidator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/SchemaRegistryServiceWithSchemaDataValidator.java
index f0ca3fc..c6c8b77 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/SchemaRegistryServiceWithSchemaDataValidator.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/SchemaRegistryServiceWithSchemaDataValidator.java
@@ -63,6 +63,16 @@ public class SchemaRegistryServiceWithSchemaDataValidator implements SchemaRegis
}
@Override
+ public CompletableFuture<List<SchemaAndMetadata>> trimDeletedSchemaAndGetList(String schemaId) {
+ return this.service.trimDeletedSchemaAndGetList(schemaId);
+ }
+
+ @Override
+ public CompletableFuture<Long> findSchemaVersion(String schemaId, SchemaData schemaData) {
+ return this.service.findSchemaVersion(schemaId, schemaData);
+ }
+
+ @Override
public CompletableFuture<SchemaVersion> putSchemaIfAbsent(String schemaId,
SchemaData schema,
SchemaCompatibilityStrategy strategy) {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/SchemaServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/SchemaServiceTest.java
index d998fdd..0c2a174 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/SchemaServiceTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/SchemaServiceTest.java
@@ -35,7 +35,10 @@ import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
+import com.google.common.hash.HashFunction;
+import com.google.common.hash.Hashing;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.broker.service.schema.SchemaRegistry.SchemaAndMetadata;
import org.apache.pulsar.common.protocol.schema.SchemaData;
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.common.protocol.schema.SchemaVersion;
@@ -111,6 +114,12 @@ public class SchemaServiceTest extends MockedPulsarServiceBaseTest {
}
@Test
+ public void findSchemaVersionTest() throws Exception {
+ putSchema(schemaId1, schema1, version(0));
+ assertEquals(0, schemaRegistryService.findSchemaVersion(schemaId1, schema1).get().longValue());
+ }
+
+ @Test
public void deleteSchemaAndAddSchema() throws Exception {
putSchema(schemaId1, schema1, version(0));
SchemaData latest = getLatestSchema(schemaId1, version(0));
@@ -243,6 +252,32 @@ public class SchemaServiceTest extends MockedPulsarServiceBaseTest {
putSchema(schemaId1, schema1, version(0));
}
+
+ @Test
+ public void trimDeletedSchemaAndGetListTest() throws Exception {
+ List<SchemaAndMetadata> list = new ArrayList<>();
+ CompletableFuture<SchemaVersion> put = schemaRegistryService.putSchemaIfAbsent(
+ schemaId1, schema1, SchemaCompatibilityStrategy.FULL);
+ SchemaVersion newVersion = put.get();
+ list.add(new SchemaAndMetadata(schemaId1, schema1, newVersion));
+ put = schemaRegistryService.putSchemaIfAbsent(
+ schemaId1, schema2, SchemaCompatibilityStrategy.FULL);
+ newVersion = put.get();
+ list.add(new SchemaAndMetadata(schemaId1, schema2, newVersion));
+ List<SchemaAndMetadata> list1 = schemaRegistryService.trimDeletedSchemaAndGetList(schemaId1).get();
+ assertEquals(list.size(), list1.size());
+ HashFunction hashFunction = Hashing.sha256();
+ for (int i = 0; i < list.size(); i++) {
+ SchemaAndMetadata schemaAndMetadata1 = list.get(i);
+ SchemaAndMetadata schemaAndMetadata2 = list1.get(i);
+ assertEquals(hashFunction.hashBytes(schemaAndMetadata1.schema.getData()).asBytes(),
+ hashFunction.hashBytes(schemaAndMetadata2.schema.getData()).asBytes());
+ assertEquals(((LongSchemaVersion)schemaAndMetadata1.version).getVersion()
+ , ((LongSchemaVersion)schemaAndMetadata2.version).getVersion());
+ assertEquals(schemaAndMetadata1.id, schemaAndMetadata2.id);
+ }
+ }
+
@Test
public void dontReAddExistingSchemaInMiddle() throws Exception {
putSchema(schemaId1, randomSchema(), version(0));
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Schemas.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Schemas.java
index 9be3c63..fa7dcca 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Schemas.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Schemas.java
@@ -18,9 +18,15 @@
*/
package org.apache.pulsar.client.admin;
+import org.apache.pulsar.common.protocol.schema.GetAllVersionsSchemaResponse;
+import org.apache.pulsar.common.protocol.schema.IsCompatibilityResponse;
import org.apache.pulsar.common.protocol.schema.PostSchemaPayload;
+import org.apache.pulsar.common.protocol.schema.PostSchemaResponse;
+import org.apache.pulsar.common.protocol.schema.SchemaVersion;
import org.apache.pulsar.common.schema.SchemaInfo;
+import java.util.List;
+
/**
* Admin interface on interacting with schemas.
*/
@@ -71,4 +77,48 @@ public interface Schemas {
*/
void createSchema(String topic, PostSchemaPayload schemaPayload) throws PulsarAdminException;
+ /**
+ * Judge schema compatibility <tt>topic</tt>.
+ *
+ * @param topic topic name, in fully qualified format
+ * @param schemaPayload schema payload
+ * @throws PulsarAdminException
+ */
+ IsCompatibilityResponse testCompatibility(String topic, PostSchemaPayload schemaPayload) throws PulsarAdminException;
+
+ /**
+ * Find schema version <tt>topic</tt>.
+ *
+ * @param topic topic name, in fully qualified format
+ * @param schemaPayload schema payload
+ * @throws PulsarAdminException
+ */
+ Long getVersionBySchema(String topic, PostSchemaPayload schemaPayload) throws PulsarAdminException;
+
+ /**
+ * Judge schema compatibility <tt>topic</tt>.
+ *
+ * @param topic topic name, in fully qualified format
+ * @param schemaInfo schema info
+ * @throws PulsarAdminException
+ */
+ IsCompatibilityResponse testCompatibility(String topic, SchemaInfo schemaInfo) throws PulsarAdminException;
+
+ /**
+ * Find schema version <tt>topic</tt>.
+ *
+ * @param topic topic name, in fully qualified format
+ * @param schemaInfo schema info
+ * @throws PulsarAdminException
+ */
+ Long getVersionBySchema(String topic, SchemaInfo schemaInfo) throws PulsarAdminException;
+
+ /**
+ * Get all version schemas <tt>topic</tt>.
+ *
+ * @param topic topic name, in fully qualified format
+ * @throws PulsarAdminException
+ */
+ List<SchemaInfo> getAllSchemas(String topic) throws PulsarAdminException;
+
}
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SchemasImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SchemasImpl.java
index cce5cd2..868a807 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SchemasImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SchemasImpl.java
@@ -28,10 +28,16 @@ import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ErrorData;
import org.apache.pulsar.common.protocol.schema.DeleteSchemaResponse;
+import org.apache.pulsar.common.protocol.schema.GetAllVersionsSchemaResponse;
import org.apache.pulsar.common.protocol.schema.GetSchemaResponse;
+import org.apache.pulsar.common.protocol.schema.IsCompatibilityResponse;
+import org.apache.pulsar.common.protocol.schema.LongSchemaVersionResponse;
import org.apache.pulsar.common.protocol.schema.PostSchemaPayload;
import org.apache.pulsar.common.schema.SchemaInfo;
+import java.util.List;
+import java.util.stream.Collectors;
+
public class SchemasImpl extends BaseResource implements Schemas {
private final WebTarget target;
@@ -76,14 +82,8 @@ public class SchemasImpl extends BaseResource implements Schemas {
@Override
public void createSchema(String topic, SchemaInfo schemaInfo) throws PulsarAdminException {
- PostSchemaPayload payload = new PostSchemaPayload();
- payload.setType(schemaInfo.getType().name());
- payload.setProperties(schemaInfo.getProperties());
- // for backward compatibility concern, we convert `bytes` to `string`
- // we can consider fixing it in a new version of rest endpoint
- payload.setSchema(convertSchemaDataToStringLegacy(schemaInfo.getSchema()));
- createSchema(topic, payload);
+ createSchema(topic, convertSchemaInfoToPostSchemaPayload(schemaInfo));
}
@Override
@@ -97,6 +97,60 @@ public class SchemasImpl extends BaseResource implements Schemas {
}
}
+ @Override
+ public IsCompatibilityResponse testCompatibility(String topic, PostSchemaPayload payload) throws PulsarAdminException {
+ try {
+ TopicName tn = TopicName.get(topic);
+ return request(compatibilityPath(tn)).post(Entity.json(payload), IsCompatibilityResponse.class);
+ } catch (Exception e) {
+ throw getApiException(e);
+ }
+ }
+
+ @Override
+ public Long getVersionBySchema(String topic, PostSchemaPayload payload) throws PulsarAdminException {
+ try {
+ return request(versionPath(TopicName.get(topic))).post(Entity.json(payload), LongSchemaVersionResponse.class).getVersion();
+ } catch (Exception e) {
+ throw getApiException(e);
+ }
+ }
+
+ @Override
+ public IsCompatibilityResponse testCompatibility(String topic, SchemaInfo schemaInfo) throws PulsarAdminException {
+ try {
+ return request(compatibilityPath(TopicName.get(topic)))
+ .post(Entity.json(convertSchemaInfoToPostSchemaPayload(schemaInfo)), IsCompatibilityResponse.class);
+ } catch (Exception e) {
+ throw getApiException(e);
+ }
+ }
+
+ @Override
+ public Long getVersionBySchema(String topic, SchemaInfo schemaInfo) throws PulsarAdminException {
+ try {
+ return request(versionPath(TopicName.get(topic)))
+ .post(Entity.json(convertSchemaInfoToPostSchemaPayload(schemaInfo)), LongSchemaVersionResponse.class).getVersion();
+ } catch (Exception e) {
+ throw getApiException(e);
+ }
+ }
+
+ @Override
+ public List<SchemaInfo> getAllSchemas(String topic) throws PulsarAdminException {
+ try {
+ TopicName topicName = TopicName.get(topic);
+ return request(schemasPath(TopicName.get(topic)))
+ .get(GetAllVersionsSchemaResponse.class)
+ .getGetSchemaResponses().stream()
+ .map(getSchemaResponse -> convertGetSchemaResponseToSchemaInfo(topicName, getSchemaResponse))
+ .collect(Collectors.toList());
+
+ } catch (Exception e) {
+ throw getApiException(e);
+ }
+ }
+
private WebTarget schemaPath(TopicName topicName) {
return target
.path(topicName.getTenant())
@@ -105,6 +159,30 @@ public class SchemasImpl extends BaseResource implements Schemas {
.path("schema");
}
+ private WebTarget versionPath(TopicName topicName) {
+ return target
+ .path(topicName.getTenant())
+ .path(topicName.getNamespacePortion())
+ .path(topicName.getEncodedLocalName())
+ .path("version");
+ }
+
+ private WebTarget schemasPath(TopicName topicName) {
+ return target
+ .path(topicName.getTenant())
+ .path(topicName.getNamespacePortion())
+ .path(topicName.getEncodedLocalName())
+ .path("schemas");
+ }
+
+ private WebTarget compatibilityPath(TopicName topicName) {
+ return target
+ .path(topicName.getTenant())
+ .path(topicName.getNamespacePortion())
+ .path(topicName.getEncodedLocalName())
+ .path("compatibility");
+ }
+
// the util function converts `GetSchemaResponse` to `SchemaInfo`
static SchemaInfo convertGetSchemaResponseToSchemaInfo(TopicName tn,
GetSchemaResponse response) {
@@ -125,4 +203,14 @@ public class SchemasImpl extends BaseResource implements Schemas {
return new String(schemaData, UTF_8);
}
+
+ static PostSchemaPayload convertSchemaInfoToPostSchemaPayload(SchemaInfo schemaInfo) {
+ PostSchemaPayload payload = new PostSchemaPayload();
+ payload.setType(schemaInfo.getType().name());
+ payload.setProperties(schemaInfo.getProperties());
+ // for backward compatibility concern, we convert `bytes` to `string`
+ // we can consider fixing it in a new version of rest endpoint
+ payload.setSchema(convertSchemaDataToStringLegacy(schemaInfo.getSchema()));
+ return payload;
+ }
}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/GetAllVersionsSchemaResponse.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/GetAllVersionsSchemaResponse.java
new file mode 100644
index 0000000..e65d50f
--- /dev/null
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/GetAllVersionsSchemaResponse.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.protocol.schema;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.util.List;
+
+@Data
+@Builder
+@AllArgsConstructor
+@NoArgsConstructor
+public class GetAllVersionsSchemaResponse {
+ private List<GetSchemaResponse> getSchemaResponses;
+}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/IsCompatibilityResponse.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/IsCompatibilityResponse.java
new file mode 100644
index 0000000..26ae2ef
--- /dev/null
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/IsCompatibilityResponse.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.protocol.schema;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+@Data
+@Builder
+@AllArgsConstructor
+@NoArgsConstructor
+public class IsCompatibilityResponse {
+ boolean isCompatibility;
+ String schemaCompatibilityStrategy;
+
+}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/LongSchemaVersionResponse.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/LongSchemaVersionResponse.java
new file mode 100644
index 0000000..be55083
--- /dev/null
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/LongSchemaVersionResponse.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.protocol.schema;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+@Data
+@Builder
+@AllArgsConstructor
+@NoArgsConstructor
+public class LongSchemaVersionResponse {
+ Long version;
+}