You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by bo...@apache.org on 2022/06/02 01:56:08 UTC
[pulsar] branch master updated: [improve][broker] Make some methods in SchemasResourceBase async. (#15821)
This is an automated email from the ASF dual-hosted git repository.
bogong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 8bcb921d973 [improve][broker] Make some methods in SchemasResourceBase async. (#15821)
8bcb921d973 is described below
commit 8bcb921d973c96ea09d885921186e4040304ebb1
Author: Jiwei Guo <te...@apache.org>
AuthorDate: Thu Jun 2 09:56:00 2022 +0800
[improve][broker] Make some methods in SchemasResourceBase async. (#15821)
### Motivation
See PIP #14365 and change tracker #15043.
Make `getSchema` / `deleteSchema` / `postSchema` / `getVersionBySchema` / `testCompatibility` methods in SchemasResourceBase to pure async.
---
.../broker/admin/impl/SchemasResourceBase.java | 129 ++++++++++++-
.../pulsar/broker/admin/v2/SchemasResource.java | 199 +++++++++++++--------
.../pulsar/broker/admin/AdminApiSchemaTest.java | 2 +-
3 files changed, 253 insertions(+), 77 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SchemasResourceBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SchemasResourceBase.java
index 761b7727532..88487c11296 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SchemasResourceBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SchemasResourceBase.java
@@ -27,17 +27,21 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.time.Clock;
import java.util.List;
+import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.service.schema.SchemaRegistry.SchemaAndMetadata;
+import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
import org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException;
import org.apache.pulsar.broker.service.schema.exceptions.InvalidSchemaDataException;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.client.internal.DefaultImplementation;
import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
import org.apache.pulsar.common.protocol.schema.DeleteSchemaResponse;
import org.apache.pulsar.common.protocol.schema.GetAllVersionsSchemaResponse;
import org.apache.pulsar.common.protocol.schema.GetSchemaResponse;
@@ -67,7 +71,7 @@ public class SchemasResourceBase extends AdminResource {
this.clock = clock;
}
- private static long getLongSchemaVersion(SchemaVersion schemaVersion) {
+ protected static long getLongSchemaVersion(SchemaVersion schemaVersion) {
if (schemaVersion instanceof LongSchemaVersion) {
return ((LongSchemaVersion) schemaVersion).getVersion();
} else {
@@ -92,6 +96,12 @@ public class SchemasResourceBase extends AdminResource {
});
}
+ public CompletableFuture<SchemaAndMetadata> getSchemaAsync(boolean authoritative) {
+ return validateDestinationAndAdminOperationAsync(authoritative)
+ .thenApply(__ -> getSchemaId())
+ .thenCompose(schemaId -> pulsar().getSchemaRegistryService().getSchema(schemaId));
+ }
+
public void getSchema(boolean authoritative, String version, AsyncResponse response) {
validateDestinationAndAdminOperation(authoritative);
String schemaId = getSchemaId();
@@ -104,6 +114,18 @@ public class SchemasResourceBase extends AdminResource {
});
}
+ public CompletableFuture<SchemaAndMetadata> getSchemaAsync(boolean authoritative, String version) {
+ return validateDestinationAndAdminOperationAsync(authoritative)
+ .thenApply(__ -> getSchemaId())
+ .thenCompose(schemaId -> {
+ ByteBuffer bbVersion = ByteBuffer.allocate(Long.BYTES);
+ bbVersion.putLong(Long.parseLong(version));
+ SchemaRegistryService schemaRegistryService = pulsar().getSchemaRegistryService();
+ SchemaVersion schemaVersion = schemaRegistryService.versionFromBytes(bbVersion.array());
+ return schemaRegistryService.getSchema(schemaId, schemaVersion);
+ });
+ }
+
public void getAllSchemas(boolean authoritative, AsyncResponse response) {
validateDestinationAndAdminOperation(authoritative);
@@ -114,6 +136,14 @@ public class SchemasResourceBase extends AdminResource {
});
}
+ public CompletableFuture<List<SchemaAndMetadata>> getAllSchemasAsync(boolean authoritative) {
+ return validateDestinationAndAdminOperationAsync(authoritative)
+ .thenCompose(__ -> {
+ String schemaId = getSchemaId();
+ return pulsar().getSchemaRegistryService().trimDeletedSchemaAndGetList(schemaId);
+ });
+ }
+
public void deleteSchema(boolean authoritative, AsyncResponse response, boolean force) {
validateDestinationAndAdminOperation(authoritative);
@@ -132,6 +162,15 @@ public class SchemasResourceBase extends AdminResource {
});
}
+ public CompletableFuture<SchemaVersion> deleteSchemaAsync(boolean authoritative, boolean force) {
+ return validateDestinationAndAdminOperationAsync(authoritative)
+ .thenCompose(__ -> {
+ String schemaId = getSchemaId();
+ return pulsar().getSchemaRegistryService()
+ .deleteSchema(schemaId, defaultIfEmpty(clientAppId(), ""), force);
+ });
+ }
+
public void postSchema(PostSchemaPayload payload, boolean authoritative, AsyncResponse response) {
validateDestinationAndAdminOperation(authoritative);
@@ -187,6 +226,33 @@ public class SchemasResourceBase extends AdminResource {
});
}
+ public CompletableFuture<SchemaVersion> postSchemaAsync(PostSchemaPayload payload, boolean authoritative) {
+ return validateDestinationAndAdminOperationAsync(authoritative)
+ .thenCompose(__ -> getSchemaCompatibilityStrategyAsync())
+ .thenCompose(schemaCompatibilityStrategy -> {
+ byte[] data;
+ if (SchemaType.KEY_VALUE.name().equals(payload.getType())) {
+ try {
+ data = DefaultImplementation.getDefaultImplementation()
+ .convertKeyValueDataStringToSchemaInfoSchema(payload.getSchema()
+ .getBytes(Charsets.UTF_8));
+ } catch (IOException conversionError) {
+ throw new RestException(conversionError);
+ }
+ } else {
+ data = payload.getSchema().getBytes(Charsets.UTF_8);
+ }
+ return pulsar().getSchemaRegistryService()
+ .putSchemaIfAbsent(getSchemaId(),
+ SchemaData.builder().data(data).isDeleted(false).timestamp(clock.millis())
+ .type(SchemaType.valueOf(payload.getType()))
+ .user(defaultIfEmpty(clientAppId(), ""))
+ .props(payload.getProperties())
+ .build(),
+ schemaCompatibilityStrategy);
+ });
+ }
+
public void testCompatibility(PostSchemaPayload payload, boolean authoritative, AsyncResponse response) {
validateDestinationAndAdminOperation(authoritative);
@@ -208,9 +274,24 @@ public class SchemasResourceBase extends AdminResource {
});
}
- public void getVersionBySchema(
+ public CompletableFuture<Pair<Boolean, SchemaCompatibilityStrategy>> testCompatibilityAsync(
+ PostSchemaPayload payload, boolean authoritative) {
+ return validateDestinationAndAdminOperationAsync(authoritative)
+ .thenCompose(__ -> getSchemaCompatibilityStrategyAsync())
+ .thenCompose(strategy -> {
+ String schemaId = getSchemaId();
+ return pulsar().getSchemaRegistryService().isCompatible(schemaId,
+ SchemaData.builder().data(payload.getSchema().getBytes(Charsets.UTF_8))
+ .isDeleted(false)
+ .timestamp(clock.millis()).type(SchemaType.valueOf(payload.getType()))
+ .user(defaultIfEmpty(clientAppId(), ""))
+ .props(payload.getProperties())
+ .build(), strategy)
+ .thenApply(v -> Pair.of(v, strategy));
+ });
+ }
- PostSchemaPayload payload, boolean authoritative, AsyncResponse response) {
+ public void getVersionBySchema(PostSchemaPayload payload, boolean authoritative, AsyncResponse response) {
validateDestinationAndAdminOperation(authoritative);
String schemaId = getSchemaId();
@@ -230,6 +311,20 @@ public class SchemasResourceBase extends AdminResource {
});
}
+ public CompletableFuture<Long> getVersionBySchemaAsync(PostSchemaPayload payload, boolean authoritative) {
+ return validateDestinationAndAdminOperationAsync(authoritative)
+ .thenCompose(__ -> {
+ String schemaId = getSchemaId();
+ return pulsar().getSchemaRegistryService()
+ .findSchemaVersion(schemaId,
+ SchemaData.builder().data(payload.getSchema().getBytes(Charsets.UTF_8))
+ .isDeleted(false).timestamp(clock.millis())
+ .type(SchemaType.valueOf(payload.getType()))
+ .user(defaultIfEmpty(clientAppId(), ""))
+ .props(payload.getProperties()).build());
+ });
+ }
+
@Override
protected String domain() {
return "persistent";
@@ -253,7 +348,7 @@ public class SchemasResourceBase extends AdminResource {
}
}
- private static void handleGetSchemaResponse(AsyncResponse response, SchemaAndMetadata schema, Throwable error) {
+ protected static void handleGetSchemaResponse(AsyncResponse response, SchemaAndMetadata schema, Throwable error) {
if (isNull(error)) {
if (isNull(schema)) {
response.resume(Response.status(
@@ -272,6 +367,27 @@ public class SchemasResourceBase extends AdminResource {
}
+ protected GetSchemaResponse convertToSchemaResponse(SchemaAndMetadata schema) {
+ if (isNull(schema)) {
+ throw new RestException(Response.Status.NOT_FOUND.getStatusCode(), "Schema not found");
+ } else if (schema.schema.isDeleted()) {
+ throw new RestException(Response.Status.NOT_FOUND.getStatusCode(), "Schema is deleted");
+ }
+ return convertSchemaAndMetadataToGetSchemaResponse(schema);
+ }
+
+ protected GetAllVersionsSchemaResponse convertToAllVersionsSchemaResponse(List<SchemaAndMetadata> schemas) {
+ if (isNull(schemas)) {
+ throw new RestException(Response.Status.NOT_FOUND.getStatusCode(), "Schemas not found");
+ } else {
+ return GetAllVersionsSchemaResponse.builder()
+ .getSchemaResponses(schemas.stream()
+ .map(SchemasResourceBase::convertSchemaAndMetadataToGetSchemaResponse)
+ .collect(Collectors.toList()))
+ .build();
+ }
+ }
+
private static void handleGetAllSchemasResponse(AsyncResponse response, List<SchemaAndMetadata> schemas,
Throwable error) {
if (isNull(error)) {
@@ -306,5 +422,10 @@ public class SchemasResourceBase extends AdminResource {
}
}
+ private CompletableFuture<Void> validateDestinationAndAdminOperationAsync(boolean authoritative) {
+ return validateTopicOwnershipAsync(topicName, authoritative)
+ .thenCompose(__ -> validateAdminAccessForTenantAsync(topicName.getTenant()));
+ }
+
private static final Logger log = LoggerFactory.getLogger(SchemasResourceBase.class);
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/SchemasResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/SchemasResource.java
index 1ac06057429..e75eed8cd7c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/SchemasResource.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/SchemasResource.java
@@ -39,14 +39,20 @@ import javax.ws.rs.QueryParam;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.container.Suspended;
import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.admin.impl.SchemasResourceBase;
+import org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException;
+import org.apache.pulsar.broker.service.schema.exceptions.InvalidSchemaDataException;
import org.apache.pulsar.common.protocol.schema.DeleteSchemaResponse;
import org.apache.pulsar.common.protocol.schema.GetAllVersionsSchemaResponse;
import org.apache.pulsar.common.protocol.schema.GetSchemaResponse;
import org.apache.pulsar.common.protocol.schema.IsCompatibilityResponse;
+import org.apache.pulsar.common.protocol.schema.LongSchemaVersionResponse;
import org.apache.pulsar.common.protocol.schema.PostSchemaPayload;
import org.apache.pulsar.common.protocol.schema.PostSchemaResponse;
import org.apache.pulsar.common.schema.LongSchemaVersion;
+import org.apache.pulsar.common.util.FutureUtil;
@Path("/schemas")
@Api(
@@ -54,6 +60,7 @@ import org.apache.pulsar.common.schema.LongSchemaVersion;
description = "Schemas related admin APIs",
tags = "schemas"
)
+@Slf4j
public class SchemasResource extends SchemasResourceBase {
@VisibleForTesting
@@ -75,14 +82,22 @@ public class SchemasResource extends SchemasResourceBase {
@ApiResponse(code = 500, message = "Internal Server Error"),
})
public void getSchema(
- @PathParam("tenant") String tenant,
- @PathParam("namespace") String namespace,
- @PathParam("topic") String topic,
- @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
- @Suspended final AsyncResponse response
- ) {
+ @PathParam("tenant") String tenant,
+ @PathParam("namespace") String namespace,
+ @PathParam("topic") String topic,
+ @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
+ @Suspended final AsyncResponse response) {
validateTopicName(tenant, namespace, topic);
- getSchema(authoritative, response);
+ getSchemaAsync(authoritative)
+ .thenApply(schemaAndMetadata -> convertToSchemaResponse(schemaAndMetadata))
+ .thenApply(response::resume)
+ .exceptionally(ex -> {
+ if (!isRedirectException(ex)) {
+ log.error("[{}] Failed to get schema for topic {}", clientAppId(), topicName, ex);
+ }
+ resumeAsyncResponseExceptionally(response, ex);
+ return null;
+ });
}
@GET
@@ -99,15 +114,24 @@ public class SchemasResource extends SchemasResourceBase {
@ApiResponse(code = 500, message = "Internal Server Error"),
})
public void getSchema(
- @PathParam("tenant") String tenant,
- @PathParam("namespace") String namespace,
- @PathParam("topic") String topic,
- @PathParam("version") @Encoded String version,
- @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
- @Suspended final AsyncResponse response
- ) {
+ @PathParam("tenant") String tenant,
+ @PathParam("namespace") String namespace,
+ @PathParam("topic") String topic,
+ @PathParam("version") @Encoded String version,
+ @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
+ @Suspended final AsyncResponse response) {
validateTopicName(tenant, namespace, topic);
- getSchema(authoritative, version, response);
+ getSchemaAsync(authoritative, version)
+ .thenApply(schemaAndMetadata -> convertToSchemaResponse(schemaAndMetadata))
+ .thenAccept(response::resume)
+ .exceptionally(ex -> {
+ if (!isRedirectException(ex)) {
+ log.error("[{}] Failed to get schema for topic {} with version {}",
+ clientAppId(), topicName, version, ex);
+ }
+ resumeAsyncResponseExceptionally(response, ex);
+ return null;
+ });
}
@GET
@@ -128,10 +152,18 @@ public class SchemasResource extends SchemasResourceBase {
@PathParam("namespace") String namespace,
@PathParam("topic") String topic,
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
- @Suspended final AsyncResponse response
- ) {
+ @Suspended final AsyncResponse response) {
validateTopicName(tenant, namespace, topic);
- getAllSchemas(authoritative, response);
+ getAllSchemasAsync(authoritative)
+ .thenApply(schemaAndMetadata -> convertToAllVersionsSchemaResponse(schemaAndMetadata))
+ .thenAccept(response::resume)
+ .exceptionally(ex -> {
+ if (!isRedirectException(ex)) {
+ log.error("[{}] Failed to get all schemas for topic {}", clientAppId(), topicName, ex);
+ }
+ resumeAsyncResponseExceptionally(response, ex);
+ return null;
+ });
}
@DELETE
@@ -147,15 +179,24 @@ public class SchemasResource extends SchemasResourceBase {
@ApiResponse(code = 500, message = "Internal Server Error"),
})
public void deleteSchema(
- @PathParam("tenant") String tenant,
- @PathParam("namespace") String namespace,
- @PathParam("topic") String topic,
- @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
- @QueryParam("force") @DefaultValue("false") boolean force,
- @Suspended final AsyncResponse response
- ) {
+ @PathParam("tenant") String tenant,
+ @PathParam("namespace") String namespace,
+ @PathParam("topic") String topic,
+ @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
+ @QueryParam("force") @DefaultValue("false") boolean force,
+ @Suspended final AsyncResponse response) {
validateTopicName(tenant, namespace, topic);
- deleteSchema(authoritative, response, force);
+ deleteSchemaAsync(authoritative, force)
+ .thenAccept(version -> {
+ response.resume(DeleteSchemaResponse.builder().version(getLongSchemaVersion(version)).build());
+ })
+ .exceptionally(ex -> {
+ if (!isRedirectException(ex)) {
+ log.error("[{}] Failed to delete schemas for topic {}", clientAppId(), topicName, ex);
+ }
+ resumeAsyncResponseExceptionally(response, ex);
+ return null;
+ });
}
@POST
@@ -174,25 +215,36 @@ public class SchemasResource extends SchemasResourceBase {
@ApiResponse(code = 500, message = "Internal Server Error"),
})
public void postSchema(
- @PathParam("tenant") String tenant,
- @PathParam("namespace") String namespace,
- @PathParam("topic") String topic,
- @ApiParam(
- value = "A JSON value presenting a schema playload. An example of the expected schema can be found down"
- + " here.",
- examples = @Example(
- value = @ExampleProperty(
- mediaType = MediaType.APPLICATION_JSON,
- value = "{\"type\": \"STRING\", \"schema\": \"\", \"properties\": { \"key1\" : \"value1\" + } }"
- )
- )
- )
- PostSchemaPayload payload,
- @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
- @Suspended final AsyncResponse response
- ) {
+ @PathParam("tenant") String tenant,
+ @PathParam("namespace") String namespace,
+ @PathParam("topic") String topic,
+ @ApiParam(value = "A JSON value presenting a schema playload."
+ + " An example of the expected schema can be found down here.",
+ examples = @Example(value = @ExampleProperty(mediaType = MediaType.APPLICATION_JSON,
+ value = "{\"type\": \"STRING\", \"schema\": \"\", \"properties\": { \"key1\" : \"value1\" + } }")))
+ PostSchemaPayload payload,
+ @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
+ @Suspended final AsyncResponse response) {
validateTopicName(tenant, namespace, topic);
- postSchema(payload, authoritative, response);
+ postSchemaAsync(payload, authoritative)
+ .thenAccept(version -> response.resume(PostSchemaResponse.builder().version(version).build()))
+ .exceptionally(ex -> {
+ Throwable root = FutureUtil.unwrapCompletionException(ex);
+ if (root instanceof IncompatibleSchemaException) {
+ response.resume(Response
+ .status(Response.Status.CONFLICT.getStatusCode(), root.getMessage())
+ .build());
+ } else if (root instanceof InvalidSchemaDataException) {
+ response.resume(Response.status(422, /* Unprocessable Entity */
+ root.getMessage()).build());
+ } else {
+ if (!isRedirectException(ex)) {
+ log.error("[{}] Failed to post schemas for topic {}", clientAppId(), topicName, root);
+ }
+ resumeAsyncResponseExceptionally(response, ex);
+ }
+ return null;
+ });
}
@POST
@@ -212,23 +264,26 @@ public class SchemasResource extends SchemasResourceBase {
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") String topic,
- @ApiParam(
- value = "A JSON value presenting a schema playload."
+ @ApiParam(value = "A JSON value presenting a schema playload."
+ " An example of the expected schema can be found down here.",
- examples = @Example(
- value = @ExampleProperty(
- mediaType = MediaType.APPLICATION_JSON,
- value = "{\"type\": \"STRING\", \"schema\": \"\","
- + " \"properties\": { \"key1\" : \"value1\" + } }"
- )
- )
- )
- PostSchemaPayload payload,
+ examples = @Example(value = @ExampleProperty(mediaType = MediaType.APPLICATION_JSON,
+ value = "{\"type\": \"STRING\", \"schema\": \"\"," + " \"properties\": { \"key1\" : \"value1\" + } }")))
+ PostSchemaPayload payload,
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
- @Suspended final AsyncResponse response
- ) {
+ @Suspended final AsyncResponse response) {
validateTopicName(tenant, namespace, topic);
- testCompatibility(payload, authoritative, response);
+ testCompatibilityAsync(payload, authoritative)
+ .thenAccept(pair -> response.resume(Response.accepted()
+ .entity(IsCompatibilityResponse.builder().isCompatibility(pair.getLeft())
+ .schemaCompatibilityStrategy(pair.getRight().name()).build())
+ .build()))
+ .exceptionally(ex -> {
+ if (!isRedirectException(ex)) {
+ log.error("[{}] Failed to test compatibility for topic {}", clientAppId(), topicName, ex);
+ }
+ resumeAsyncResponseExceptionally(response, ex);
+ return null;
+ });
}
@POST
@@ -249,22 +304,22 @@ public class SchemasResource extends SchemasResourceBase {
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") String topic,
- @ApiParam(
- value = "A JSON value presenting a schema playload."
+ @ApiParam(value = "A JSON value presenting a schema playload."
+ " An example of the expected schema can be found down here.",
- examples = @Example(
- value = @ExampleProperty(
- mediaType = MediaType.APPLICATION_JSON,
- value = "{\"type\": \"STRING\", \"schema\": \"\","
- + " \"properties\": { \"key1\" : \"value1\" + } }"
- )
- )
- )
- PostSchemaPayload payload,
+ examples = @Example(value = @ExampleProperty(mediaType = MediaType.APPLICATION_JSON,
+ value = "{\"type\": \"STRING\", \"schema\": \"\"," + " \"properties\": { \"key1\" : \"value1\" + } }")))
+ PostSchemaPayload payload,
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
- @Suspended final AsyncResponse response
- ) {
+ @Suspended final AsyncResponse response) {
validateTopicName(tenant, namespace, topic);
- getVersionBySchema(payload, authoritative, response);
+ getVersionBySchemaAsync(payload, authoritative)
+ .thenAccept(version -> response.resume(LongSchemaVersionResponse.builder().version(version).build()))
+ .exceptionally(ex -> {
+ if (!isRedirectException(ex)) {
+ log.error("[{}] Failed to get version by schema for topic {}", clientAppId(), topicName, ex);
+ }
+ resumeAsyncResponseExceptionally(response, ex);
+ return null;
+ });
}
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaTest.java
index cd1a3b781ac..acbe1f7c246 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaTest.java
@@ -212,7 +212,7 @@ public class AdminApiSchemaTest extends MockedPulsarServiceBaseTest {
admin.schemas().createSchema(topicName, fooSchemaInfo);
fail("Should have failed");
} catch (PulsarAdminException.NotFoundException e) {
- assertTrue(e.getMessage().contains("HTTP 404"));
+ assertTrue(e.getMessage().contains("Namespace does not exist"));
}
}