You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2018/05/01 18:20:59 UTC

[incubator-pulsar] branch master updated: Return an error if schema is incompatible (#1692)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 28eb372  Return an error if schema is incompatible (#1692)
28eb372 is described below

commit 28eb37204306b54c253fc1d5ae7898455624def4
Author: Dave Rusek <da...@gmail.com>
AuthorDate: Tue May 1 12:20:57 2018 -0600

    Return an error if schema is incompatible (#1692)
    
    * Return an error is schema is incompatible
    
    * Handle string versions correctly
    
    * Handle string versions correctly
    
    * fix header and update status code for incompatible schemas
---
 .../pulsar/broker/admin/v2/SchemasResource.java    | 24 +++++++++++++-----
 .../schema/IncompatibleSchemaException.java        | 29 ++++++++++++++++++++++
 .../service/schema/SchemaRegistryServiceImpl.java  |  2 +-
 3 files changed, 48 insertions(+), 7 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/SchemasResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/SchemasResource.java
index f6d79cc..0f6b996 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/SchemasResource.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/SchemasResource.java
@@ -18,8 +18,6 @@
  */
 package org.apache.pulsar.broker.admin.v2;
 
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
 import static java.util.Objects.isNull;
 import static org.apache.commons.lang.StringUtils.defaultIfEmpty;
 import static org.apache.pulsar.common.util.Codec.decode;
@@ -27,8 +25,9 @@ import static org.apache.pulsar.common.util.Codec.decode;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Charsets;
 import io.swagger.annotations.ApiOperation;
+import java.nio.ByteBuffer;
 import java.time.Clock;
-import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
 import javax.ws.rs.Consumes;
 import javax.ws.rs.DELETE;
 import javax.ws.rs.Encoded;
@@ -42,7 +41,7 @@ import javax.ws.rs.container.Suspended;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
 import org.apache.pulsar.broker.admin.AdminResource;
-import org.apache.pulsar.broker.service.Topic;
+import org.apache.pulsar.broker.service.schema.IncompatibleSchemaException;
 import org.apache.pulsar.broker.web.RestException;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.schema.DeleteSchemaResponse;
@@ -86,6 +85,8 @@ public class SchemasResource extends AdminResource {
                 if (isNull(error)) {
                     if (isNull(schema)) {
                         response.resume(Response.status(Response.Status.NOT_FOUND).build());
+                    } else if (schema.schema.isDeleted()) {
+                        response.resume(Response.noContent());
                     } else {
                         response.resume(
                             Response.ok()
@@ -122,7 +123,9 @@ public class SchemasResource extends AdminResource {
         validateDestinationAndAdminOperation(tenant, namespace, topic);
 
         String schemaId = buildSchemaId(tenant, namespace, topic);
-        SchemaVersion v = pulsar().getSchemaRegistryService().versionFromBytes(version.getBytes());
+        ByteBuffer bbVersion = ByteBuffer.allocate(Long.SIZE);
+        bbVersion.putLong(Long.parseLong(version));
+        SchemaVersion v = pulsar().getSchemaRegistryService().versionFromBytes(bbVersion.array());
         pulsar().getSchemaRegistryService().getSchema(schemaId, v)
             .handle((schema, error) -> {
                 if (isNull(error)) {
@@ -212,7 +215,16 @@ public class SchemasResource extends AdminResource {
                         .build()
                 ).build()
             )
-        );
+        ).exceptionally(error -> {
+            if (error instanceof IncompatibleSchemaException) {
+                response.resume(Response.status(Response.Status.CONFLICT).build());
+            } else {
+                response.resume(
+                    Response.serverError().build()
+                );
+            }
+            return null;
+        });
     }
 
     private String buildSchemaId(String tenant, String namespace, String topic) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/IncompatibleSchemaException.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/IncompatibleSchemaException.java
new file mode 100644
index 0000000..975ba0d
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/IncompatibleSchemaException.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.schema;
+
+public class IncompatibleSchemaException extends Exception {
+    public IncompatibleSchemaException() {
+        super("Incompatible schema used");
+    }
+
+    public IncompatibleSchemaException(String message) {
+        super(message);
+    }
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
index 30e9b47..b1a7d2c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
@@ -97,7 +97,7 @@ public class SchemaRegistryServiceImpl implements SchemaRegistryService {
                     .build();
                 return schemaStorage.put(schemaId, info.toByteArray(), context);
             } else {
-                return FutureUtil.failedFuture(new Exception());
+                return FutureUtil.failedFuture(new IncompatibleSchemaException());
             }
         });
     }

-- 
To stop receiving notification emails like this one, please contact
mmerli@apache.org.