You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2022/05/27 14:19:13 UTC

[pulsar] branch master updated: [fix][admin] Unwrap the completion exception on schemas admin API (#15733)

This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 20390f0982b [fix][admin] Unwrap the completion exception on schemas admin API (#15733)
20390f0982b is described below

commit 20390f0982b85d26aa2ab60cb647792d5c337436
Author: Kai Wang <kw...@streamnative.io>
AuthorDate: Fri May 27 22:19:07 2022 +0800

    [fix][admin] Unwrap the completion exception on schemas admin API (#15733)
---
 .../broker/admin/impl/SchemasResourceBase.java     | 30 ++++++++++++----------
 .../pulsar/broker/admin/AdminApiSchemaTest.java    | 21 +++++++++++++++
 2 files changed, 38 insertions(+), 13 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SchemasResourceBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SchemasResourceBase.java
index dc801975470..761b7727532 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SchemasResourceBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SchemasResourceBase.java
@@ -49,6 +49,7 @@ import org.apache.pulsar.common.protocol.schema.SchemaData;
 import org.apache.pulsar.common.protocol.schema.SchemaVersion;
 import org.apache.pulsar.common.schema.LongSchemaVersion;
 import org.apache.pulsar.common.schema.SchemaType;
+import org.apache.pulsar.common.util.FutureUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -157,28 +158,30 @@ public class SchemasResourceBase extends AdminResource {
                     .thenAccept(version -> response.resume(
                             Response.accepted().entity(PostSchemaResponse.builder().version(version).build()).build()))
                     .exceptionally(error -> {
-                        if (error.getCause() instanceof IncompatibleSchemaException) {
+                        Throwable throwable = FutureUtil.unwrapCompletionException(error);
+                        if (throwable instanceof IncompatibleSchemaException) {
                             response.resume(Response
-                                    .status(Response.Status.CONFLICT.getStatusCode(), error.getCause().getMessage())
+                                    .status(Response.Status.CONFLICT.getStatusCode(), throwable.getMessage())
                                     .build());
-                        } else if (error instanceof InvalidSchemaDataException) {
+                        } else if (throwable instanceof InvalidSchemaDataException) {
                             response.resume(Response.status(422, /* Unprocessable Entity */
-                                    error.getMessage()).build());
+                                    throwable.getMessage()).build());
                         } else {
-                            log.error("[{}] Failed to post schema for topic {}", clientAppId(), topicName, error);
-                            response.resume(new RestException(error));
+                            log.error("[{}] Failed to post schema for topic {}", clientAppId(), topicName, throwable);
+                            response.resume(new RestException(throwable));
                         }
                         return null;
                     });
         }).exceptionally(error -> {
-            if (error.getCause() instanceof RestException) {
+            Throwable throwable = FutureUtil.unwrapCompletionException(error);
+            if (throwable instanceof RestException) {
                 // Unprocessable Entity
                 response.resume(Response
-                        .status(((RestException) error.getCause()).getResponse().getStatus(), error.getMessage())
+                        .status(((RestException) throwable).getResponse().getStatus(), throwable.getMessage())
                         .build());
             } else {
-                log.error("[{}] Failed to post schema for topic {}", clientAppId(), topicName, error);
-                response.resume(new RestException(error));
+                log.error("[{}] Failed to post schema for topic {}", clientAppId(), topicName, throwable);
+                response.resume(new RestException(throwable));
             }
             return null;
         });
@@ -200,7 +203,7 @@ public class SchemasResourceBase extends AdminResource {
                                         .schemaCompatibilityStrategy(schemaCompatibilityStrategy.name()).build())
                                 .build())))
                 .exceptionally(error -> {
-                    response.resume(new RestException(error));
+                    response.resume(new RestException(FutureUtil.unwrapCompletionException(error)));
                     return null;
                 });
     }
@@ -220,8 +223,9 @@ public class SchemasResourceBase extends AdminResource {
                 .thenAccept(version -> response.resume(Response.accepted()
                         .entity(LongSchemaVersionResponse.builder().version(version).build()).build()))
                 .exceptionally(error -> {
-                    log.error("[{}] Failed to get version by schema for topic {}", clientAppId(), topicName, error);
-                    response.resume(new RestException(error));
+                    Throwable throwable = FutureUtil.unwrapCompletionException(error);
+                    log.error("[{}] Failed to get version by schema for topic {}", clientAppId(), topicName, throwable);
+                    response.resume(new RestException(throwable));
                     return null;
                 });
     }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaTest.java
index deaf0dce86d..cd1a3b781ac 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaTest.java
@@ -49,7 +49,9 @@ import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
 import org.apache.pulsar.common.schema.SchemaInfo;
 import org.apache.pulsar.common.schema.SchemaInfoWithVersion;
+import org.apache.pulsar.common.schema.SchemaType;
 import org.awaitility.Awaitility;
+import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.DataProvider;
@@ -247,6 +249,25 @@ public class AdminApiSchemaTest extends MockedPulsarServiceBaseTest {
         assertEquals(keyValueSchema.getSchemaInfo(), schemaInfo);
     }
 
+
+    @Test(dataProvider = "version")
+    public void testInvalidSchemaDataException(ApiVersion version) {
+        String namespace = format("%s%s%s", "schematest", (ApiVersion.V1.equals(version) ? "/" + cluster + "/" : "/"),
+                "test");
+        String topicName = "persistent://"+ namespace + "/test-invalid-schema-data-exception";
+        SchemaInfo schemaInfo = SchemaInfo.builder()
+                .schema(new byte[0])
+                .type(SchemaType.AVRO)
+                .name("test")
+                .build();
+        try {
+            admin.schemas().createSchema(topicName, schemaInfo);
+        } catch (PulsarAdminException e) {
+            Assert.assertEquals(e.getStatusCode(), 422);
+            Assert.assertEquals(e.getMessage(), "HTTP 422 Invalid schema definition data for AVRO schema");
+        }
+    }
+
     @Test
     void getTopicIntervalStateIncludeSchemaStoreLedger() throws PulsarAdminException {
         String topicName = "persistent://schematest/test/get-schema-ledger-info";