You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2018/05/01 18:20:59 UTC
[incubator-pulsar] branch master updated: Return an error if schema
is incompatible (#1692)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 28eb372 Return an error if schema is incompatible (#1692)
28eb372 is described below
commit 28eb37204306b54c253fc1d5ae7898455624def4
Author: Dave Rusek <da...@gmail.com>
AuthorDate: Tue May 1 12:20:57 2018 -0600
Return an error if schema is incompatible (#1692)
* Return an error is schema is incompatible
* Handle string versions correctly
* Handle string versions correctly
* fix header and update status code for incompatible schemas
---
.../pulsar/broker/admin/v2/SchemasResource.java | 24 +++++++++++++-----
.../schema/IncompatibleSchemaException.java | 29 ++++++++++++++++++++++
.../service/schema/SchemaRegistryServiceImpl.java | 2 +-
3 files changed, 48 insertions(+), 7 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/SchemasResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/SchemasResource.java
index f6d79cc..0f6b996 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/SchemasResource.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/SchemasResource.java
@@ -18,8 +18,6 @@
*/
package org.apache.pulsar.broker.admin.v2;
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
import static java.util.Objects.isNull;
import static org.apache.commons.lang.StringUtils.defaultIfEmpty;
import static org.apache.pulsar.common.util.Codec.decode;
@@ -27,8 +25,9 @@ import static org.apache.pulsar.common.util.Codec.decode;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Charsets;
import io.swagger.annotations.ApiOperation;
+import java.nio.ByteBuffer;
import java.time.Clock;
-import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
import javax.ws.rs.Consumes;
import javax.ws.rs.DELETE;
import javax.ws.rs.Encoded;
@@ -42,7 +41,7 @@ import javax.ws.rs.container.Suspended;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import org.apache.pulsar.broker.admin.AdminResource;
-import org.apache.pulsar.broker.service.Topic;
+import org.apache.pulsar.broker.service.schema.IncompatibleSchemaException;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.schema.DeleteSchemaResponse;
@@ -86,6 +85,8 @@ public class SchemasResource extends AdminResource {
if (isNull(error)) {
if (isNull(schema)) {
response.resume(Response.status(Response.Status.NOT_FOUND).build());
+ } else if (schema.schema.isDeleted()) {
+ response.resume(Response.noContent());
} else {
response.resume(
Response.ok()
@@ -122,7 +123,9 @@ public class SchemasResource extends AdminResource {
validateDestinationAndAdminOperation(tenant, namespace, topic);
String schemaId = buildSchemaId(tenant, namespace, topic);
- SchemaVersion v = pulsar().getSchemaRegistryService().versionFromBytes(version.getBytes());
+ ByteBuffer bbVersion = ByteBuffer.allocate(Long.SIZE);
+ bbVersion.putLong(Long.parseLong(version));
+ SchemaVersion v = pulsar().getSchemaRegistryService().versionFromBytes(bbVersion.array());
pulsar().getSchemaRegistryService().getSchema(schemaId, v)
.handle((schema, error) -> {
if (isNull(error)) {
@@ -212,7 +215,16 @@ public class SchemasResource extends AdminResource {
.build()
).build()
)
- );
+ ).exceptionally(error -> {
+ if (error instanceof IncompatibleSchemaException) {
+ response.resume(Response.status(Response.Status.CONFLICT).build());
+ } else {
+ response.resume(
+ Response.serverError().build()
+ );
+ }
+ return null;
+ });
}
private String buildSchemaId(String tenant, String namespace, String topic) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/IncompatibleSchemaException.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/IncompatibleSchemaException.java
new file mode 100644
index 0000000..975ba0d
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/IncompatibleSchemaException.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.schema;
+
+public class IncompatibleSchemaException extends Exception {
+ public IncompatibleSchemaException() {
+ super("Incompatible schema used");
+ }
+
+ public IncompatibleSchemaException(String message) {
+ super(message);
+ }
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
index 30e9b47..b1a7d2c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
@@ -97,7 +97,7 @@ public class SchemaRegistryServiceImpl implements SchemaRegistryService {
.build();
return schemaStorage.put(schemaId, info.toByteArray(), context);
} else {
- return FutureUtil.failedFuture(new Exception());
+ return FutureUtil.failedFuture(new IncompatibleSchemaException());
}
});
}
--
To stop receiving notification emails like this one, please contact
mmerli@apache.org.