You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2022/05/27 14:19:13 UTC
[pulsar] branch master updated: [fix][admin] Unwrap the completion exception on schemas admin API (#15733)
This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 20390f0982b [fix][admin] Unwrap the completion exception on schemas admin API (#15733)
20390f0982b is described below
commit 20390f0982b85d26aa2ab60cb647792d5c337436
Author: Kai Wang <kw...@streamnative.io>
AuthorDate: Fri May 27 22:19:07 2022 +0800
[fix][admin] Unwrap the completion exception on schemas admin API (#15733)
---
.../broker/admin/impl/SchemasResourceBase.java | 30 ++++++++++++----------
.../pulsar/broker/admin/AdminApiSchemaTest.java | 21 +++++++++++++++
2 files changed, 38 insertions(+), 13 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SchemasResourceBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SchemasResourceBase.java
index dc801975470..761b7727532 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SchemasResourceBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SchemasResourceBase.java
@@ -49,6 +49,7 @@ import org.apache.pulsar.common.protocol.schema.SchemaData;
import org.apache.pulsar.common.protocol.schema.SchemaVersion;
import org.apache.pulsar.common.schema.LongSchemaVersion;
import org.apache.pulsar.common.schema.SchemaType;
+import org.apache.pulsar.common.util.FutureUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -157,28 +158,30 @@ public class SchemasResourceBase extends AdminResource {
.thenAccept(version -> response.resume(
Response.accepted().entity(PostSchemaResponse.builder().version(version).build()).build()))
.exceptionally(error -> {
- if (error.getCause() instanceof IncompatibleSchemaException) {
+ Throwable throwable = FutureUtil.unwrapCompletionException(error);
+ if (throwable instanceof IncompatibleSchemaException) {
response.resume(Response
- .status(Response.Status.CONFLICT.getStatusCode(), error.getCause().getMessage())
+ .status(Response.Status.CONFLICT.getStatusCode(), throwable.getMessage())
.build());
- } else if (error instanceof InvalidSchemaDataException) {
+ } else if (throwable instanceof InvalidSchemaDataException) {
response.resume(Response.status(422, /* Unprocessable Entity */
- error.getMessage()).build());
+ throwable.getMessage()).build());
} else {
- log.error("[{}] Failed to post schema for topic {}", clientAppId(), topicName, error);
- response.resume(new RestException(error));
+ log.error("[{}] Failed to post schema for topic {}", clientAppId(), topicName, throwable);
+ response.resume(new RestException(throwable));
}
return null;
});
}).exceptionally(error -> {
- if (error.getCause() instanceof RestException) {
+ Throwable throwable = FutureUtil.unwrapCompletionException(error);
+ if (throwable instanceof RestException) {
// Unprocessable Entity
response.resume(Response
- .status(((RestException) error.getCause()).getResponse().getStatus(), error.getMessage())
+ .status(((RestException) throwable).getResponse().getStatus(), throwable.getMessage())
.build());
} else {
- log.error("[{}] Failed to post schema for topic {}", clientAppId(), topicName, error);
- response.resume(new RestException(error));
+ log.error("[{}] Failed to post schema for topic {}", clientAppId(), topicName, throwable);
+ response.resume(new RestException(throwable));
}
return null;
});
@@ -200,7 +203,7 @@ public class SchemasResourceBase extends AdminResource {
.schemaCompatibilityStrategy(schemaCompatibilityStrategy.name()).build())
.build())))
.exceptionally(error -> {
- response.resume(new RestException(error));
+ response.resume(new RestException(FutureUtil.unwrapCompletionException(error)));
return null;
});
}
@@ -220,8 +223,9 @@ public class SchemasResourceBase extends AdminResource {
.thenAccept(version -> response.resume(Response.accepted()
.entity(LongSchemaVersionResponse.builder().version(version).build()).build()))
.exceptionally(error -> {
- log.error("[{}] Failed to get version by schema for topic {}", clientAppId(), topicName, error);
- response.resume(new RestException(error));
+ Throwable throwable = FutureUtil.unwrapCompletionException(error);
+ log.error("[{}] Failed to get version by schema for topic {}", clientAppId(), topicName, throwable);
+ response.resume(new RestException(throwable));
return null;
});
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaTest.java
index deaf0dce86d..cd1a3b781ac 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaTest.java
@@ -49,7 +49,9 @@ import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaInfoWithVersion;
+import org.apache.pulsar.common.schema.SchemaType;
import org.awaitility.Awaitility;
+import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
@@ -247,6 +249,25 @@ public class AdminApiSchemaTest extends MockedPulsarServiceBaseTest {
assertEquals(keyValueSchema.getSchemaInfo(), schemaInfo);
}
+
+ @Test(dataProvider = "version")
+ public void testInvalidSchemaDataException(ApiVersion version) {
+ String namespace = format("%s%s%s", "schematest", (ApiVersion.V1.equals(version) ? "/" + cluster + "/" : "/"),
+ "test");
+ String topicName = "persistent://"+ namespace + "/test-invalid-schema-data-exception";
+ SchemaInfo schemaInfo = SchemaInfo.builder()
+ .schema(new byte[0])
+ .type(SchemaType.AVRO)
+ .name("test")
+ .build();
+ try {
+ admin.schemas().createSchema(topicName, schemaInfo);
+ } catch (PulsarAdminException e) {
+ Assert.assertEquals(e.getStatusCode(), 422);
+ Assert.assertEquals(e.getMessage(), "HTTP 422 Invalid schema definition data for AVRO schema");
+ }
+ }
+
@Test
void getTopicIntervalStateIncludeSchemaStoreLedger() throws PulsarAdminException {
String topicName = "persistent://schematest/test/get-schema-ledger-info";