You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by je...@apache.org on 2018/10/04 00:22:49 UTC
[pulsar] branch master updated: Fix: Brokers redirect Schema REST
requests in a loop when bundle doesn't have a leader (#2712)
This is an automated email from the ASF dual-hosted git repository.
jerrypeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 606b4de Fix: Brokers redirect Schema REST requests in a loop when bundle doesn't have a leader (#2712)
606b4de is described below
commit 606b4de3429708cf2e008e913214a44c408e662b
Author: Boyang Jerry Peng <je...@gmail.com>
AuthorDate: Wed Oct 3 17:22:44 2018 -0700
Fix: Brokers redirect Schema REST requests in a loop when bundle doesn't have a leader (#2712)
---
.../pulsar/broker/admin/v2/SchemasResource.java | 19 +++++++++++++------
1 file changed, 13 insertions(+), 6 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/SchemasResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/SchemasResource.java
index ab323aa..6076276 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/SchemasResource.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/SchemasResource.java
@@ -35,12 +35,14 @@ import java.nio.ByteBuffer;
import java.time.Clock;
import javax.ws.rs.Consumes;
import javax.ws.rs.DELETE;
+import javax.ws.rs.DefaultValue;
import javax.ws.rs.Encoded;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.container.Suspended;
import javax.ws.rs.core.MediaType;
@@ -105,9 +107,10 @@ public class SchemasResource extends AdminResource {
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") String topic,
+ @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
@Suspended final AsyncResponse response
) {
- validateDestinationAndAdminOperation(tenant, namespace, topic);
+ validateDestinationAndAdminOperation(tenant, namespace, topic, authoritative);
String schemaId = buildSchemaId(tenant, namespace, topic);
pulsar().getSchemaRegistryService().getSchema(schemaId)
@@ -155,9 +158,10 @@ public class SchemasResource extends AdminResource {
@PathParam("namespace") String namespace,
@PathParam("topic") String topic,
@PathParam("version") @Encoded String version,
+ @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
@Suspended final AsyncResponse response
) {
- validateDestinationAndAdminOperation(tenant, namespace, topic);
+ validateDestinationAndAdminOperation(tenant, namespace, topic, authoritative);
String schemaId = buildSchemaId(tenant, namespace, topic);
ByteBuffer bbVersion = ByteBuffer.allocate(Long.SIZE);
@@ -206,9 +210,10 @@ public class SchemasResource extends AdminResource {
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") String topic,
+ @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
@Suspended final AsyncResponse response
) {
- validateDestinationAndAdminOperation(tenant, namespace, topic);
+ validateDestinationAndAdminOperation(tenant, namespace, topic, authoritative);
String schemaId = buildSchemaId(tenant, namespace, topic);
pulsar().getSchemaRegistryService().deleteSchema(schemaId, defaultIfEmpty(clientAppId(), ""))
@@ -255,9 +260,10 @@ public class SchemasResource extends AdminResource {
)
)
PostSchemaPayload payload,
+ @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
@Suspended final AsyncResponse response
) {
- validateDestinationAndAdminOperation(tenant, namespace, topic);
+ validateDestinationAndAdminOperation(tenant, namespace, topic, authoritative);
pulsar().getSchemaRegistryService().putSchemaIfAbsent(
buildSchemaId(tenant, namespace, topic),
@@ -292,14 +298,15 @@ public class SchemasResource extends AdminResource {
return TopicName.get("persistent", tenant, namespace, topic).getSchemaName();
}
- private void validateDestinationAndAdminOperation(String tenant, String namespace, String topic) {
+ private void validateDestinationAndAdminOperation(String tenant, String namespace, String topic,
+ boolean authoritative) {
TopicName destinationName = TopicName.get(
"persistent", tenant, namespace, decode(topic)
);
try {
validateAdminAccessForTenant(destinationName.getTenant());
- validateTopicOwnership(destinationName, false);
+ validateTopicOwnership(destinationName, authoritative);
} catch (RestException e) {
if (e.getResponse().getStatus() == Response.Status.UNAUTHORIZED.getStatusCode()) {
throw new RestException(Response.Status.NOT_FOUND, "Not Found");