You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by je...@apache.org on 2018/10/04 00:22:49 UTC

[pulsar] branch master updated: Fix: Brokers redirect Schema REST requests in a loop when bundle doesn't have a leader (#2712)

This is an automated email from the ASF dual-hosted git repository.

jerrypeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 606b4de  Fix: Brokers redirect Schema REST requests in a loop when bundle doesn't have a leader (#2712)
606b4de is described below

commit 606b4de3429708cf2e008e913214a44c408e662b
Author: Boyang Jerry Peng <je...@gmail.com>
AuthorDate: Wed Oct 3 17:22:44 2018 -0700

    Fix: Brokers redirect Schema REST requests in a loop when bundle doesn't have a leader (#2712)
---
 .../pulsar/broker/admin/v2/SchemasResource.java       | 19 +++++++++++++------
 1 file changed, 13 insertions(+), 6 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/SchemasResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/SchemasResource.java
index ab323aa..6076276 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/SchemasResource.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/SchemasResource.java
@@ -35,12 +35,14 @@ import java.nio.ByteBuffer;
 import java.time.Clock;
 import javax.ws.rs.Consumes;
 import javax.ws.rs.DELETE;
+import javax.ws.rs.DefaultValue;
 import javax.ws.rs.Encoded;
 import javax.ws.rs.GET;
 import javax.ws.rs.POST;
 import javax.ws.rs.Path;
 import javax.ws.rs.PathParam;
 import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
 import javax.ws.rs.container.AsyncResponse;
 import javax.ws.rs.container.Suspended;
 import javax.ws.rs.core.MediaType;
@@ -105,9 +107,10 @@ public class SchemasResource extends AdminResource {
         @PathParam("tenant") String tenant,
         @PathParam("namespace") String namespace,
         @PathParam("topic") String topic,
+        @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
         @Suspended final AsyncResponse response
     ) {
-        validateDestinationAndAdminOperation(tenant, namespace, topic);
+        validateDestinationAndAdminOperation(tenant, namespace, topic, authoritative);
 
         String schemaId = buildSchemaId(tenant, namespace, topic);
         pulsar().getSchemaRegistryService().getSchema(schemaId)
@@ -155,9 +158,10 @@ public class SchemasResource extends AdminResource {
         @PathParam("namespace") String namespace,
         @PathParam("topic") String topic,
         @PathParam("version") @Encoded String version,
+        @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
         @Suspended final AsyncResponse response
     ) {
-        validateDestinationAndAdminOperation(tenant, namespace, topic);
+        validateDestinationAndAdminOperation(tenant, namespace, topic, authoritative);
 
         String schemaId = buildSchemaId(tenant, namespace, topic);
         ByteBuffer bbVersion = ByteBuffer.allocate(Long.SIZE);
@@ -206,9 +210,10 @@ public class SchemasResource extends AdminResource {
         @PathParam("tenant") String tenant,
         @PathParam("namespace") String namespace,
         @PathParam("topic") String topic,
+        @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
         @Suspended final AsyncResponse response
     ) {
-        validateDestinationAndAdminOperation(tenant, namespace, topic);
+        validateDestinationAndAdminOperation(tenant, namespace, topic, authoritative);
 
         String schemaId = buildSchemaId(tenant, namespace, topic);
         pulsar().getSchemaRegistryService().deleteSchema(schemaId, defaultIfEmpty(clientAppId(), ""))
@@ -255,9 +260,10 @@ public class SchemasResource extends AdminResource {
             )
         )
         PostSchemaPayload payload,
+        @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
         @Suspended final AsyncResponse response
     ) {
-        validateDestinationAndAdminOperation(tenant, namespace, topic);
+        validateDestinationAndAdminOperation(tenant, namespace, topic, authoritative);
 
         pulsar().getSchemaRegistryService().putSchemaIfAbsent(
             buildSchemaId(tenant, namespace, topic),
@@ -292,14 +298,15 @@ public class SchemasResource extends AdminResource {
         return TopicName.get("persistent", tenant, namespace, topic).getSchemaName();
     }
 
-    private void validateDestinationAndAdminOperation(String tenant, String namespace, String topic) {
+    private void validateDestinationAndAdminOperation(String tenant, String namespace, String topic,
+                                                      boolean authoritative) {
         TopicName destinationName = TopicName.get(
             "persistent", tenant, namespace, decode(topic)
         );
 
         try {
             validateAdminAccessForTenant(destinationName.getTenant());
-            validateTopicOwnership(destinationName, false);
+            validateTopicOwnership(destinationName, authoritative);
         } catch (RestException e) {
             if (e.getResponse().getStatus() == Response.Status.UNAUTHORIZED.getStatusCode()) {
                 throw new RestException(Response.Status.NOT_FOUND, "Not Found");