You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2020/05/27 05:35:58 UTC

[pulsar] branch master updated: fix add partition failed when bundle unloaded (#6856)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new bbec7eb  fix add partition failed when bundle unloaded (#6856)
bbec7eb is described below

commit bbec7eb6ca30bef10b83d46839ef152e3267c656
Author: hangc0276 <ha...@163.com>
AuthorDate: Wed May 27 13:35:45 2020 +0800

    fix add partition failed when bundle unloaded (#6856)
    
    ### Motivation
    When a topic with high input/output load, we will get add partition failed, the failed log as follows:
    ```
    Failed to perform http post request: org.asynchttpclient.handler.MaxRedirectException: Maximum redirect reached: 5
    null
    
    Reason: org.asynchttpclient.handler.MaxRedirectException: Maximum redirect reached: 5
    ```
    
    ### Bug description
    The reason is when the topic with high load, the topic's bundle will be unload. In the same time, we call pulsar admin to add partition for the topic, the request will post to one broker A,  broker A can't find the topic-bundle's owner, it will redirect the request to the leader broker B, broker B find a candidate broker C to own the bundle, and redirect request to  broker C with authoritative flag.
    
    However, broker C can't receive the authoritative flag (that's the bug), and can't find the topic-bundle's owner and has no authoritative flag, so it redirect the request to the leader broker B and goes on in cycle. In the end it reaches the max redirect limit and failed.
---
 .../org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java | 4 ++--
 .../java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java  | 8 +++++---
 .../java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java  | 4 +++-
 .../java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java | 2 +-
 4 files changed, 11 insertions(+), 7 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index ece5371..a052191 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -468,8 +468,8 @@ public class PersistentTopicsBase extends AdminResource {
      *
      * @param numPartitions
      */
-    protected void internalUpdatePartitionedTopic(int numPartitions, boolean updateLocalTopicOnly) {
-        validateWriteOperationOnTopic(false);
+    protected void internalUpdatePartitionedTopic(int numPartitions, boolean updateLocalTopicOnly, boolean authoritative) {
+        validateWriteOperationOnTopic(authoritative);
         // Only do the validation if it's the first hop.
         if (!updateLocalTopicOnly) {
             validatePartitionTopicUpdate(topicName.getLocalName(), numPartitions);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
index 3c56f1c..c3003e5 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
@@ -46,8 +46,6 @@ import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
 import org.apache.pulsar.common.policies.data.AuthAction;
-import org.apache.pulsar.common.policies.data.PartitionedTopicInternalStats;
-import org.apache.pulsar.common.policies.data.PartitionedTopicStats;
 import org.apache.pulsar.common.policies.data.PersistentOfflineTopicStats;
 import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
 import org.apache.pulsar.common.policies.data.TopicStats;
@@ -57,6 +55,8 @@ import io.swagger.annotations.Api;
 import io.swagger.annotations.ApiOperation;
 import io.swagger.annotations.ApiResponse;
 import io.swagger.annotations.ApiResponses;
+import io.swagger.annotations.ApiParam;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -186,9 +186,11 @@ public class PersistentTopics extends PersistentTopicsBase {
     public void updatePartitionedTopic(@PathParam("property") String property, @PathParam("cluster") String cluster,
             @PathParam("namespace") String namespace, @PathParam("topic") @Encoded String encodedTopic,
             @QueryParam("updateLocalTopicOnly") @DefaultValue("false") boolean updateLocalTopicOnly,
+            @ApiParam(value = "Is authentication required to perform this operation")
+            @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
             int numPartitions) {
         validateTopicName(property, cluster, namespace, encodedTopic);
-        internalUpdatePartitionedTopic(numPartitions, updateLocalTopicOnly);
+        internalUpdatePartitionedTopic(numPartitions, updateLocalTopicOnly, authoritative);
     }
 
     @GET
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index dcd2ed8..4721c05 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -278,10 +278,12 @@ public class PersistentTopics extends PersistentTopicsBase {
             @ApiParam(value = "Specify topic name", required = true)
             @PathParam("topic") @Encoded String encodedTopic,
             @QueryParam("updateLocalTopicOnly") @DefaultValue("false") boolean updateLocalTopicOnly,
+            @ApiParam(value = "Is authentication required to perform this operation")
+            @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
             @ApiParam(value = "The number of partitions for the topic", required = true, type = "int", defaultValue = "0")
             int numPartitions) {
         validatePartitionedTopicName(tenant, namespace, encodedTopic);
-        internalUpdatePartitionedTopic(numPartitions, updateLocalTopicOnly);
+        internalUpdatePartitionedTopic(numPartitions, updateLocalTopicOnly, authoritative);
     }
 
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
index 96b4aa3..452e2e4 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
@@ -288,7 +288,7 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
         persistentTopics.createPartitionedTopic(response, testTenant, testNamespace, partitionedTopicName, 5);
         verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
         Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
-        persistentTopics.updatePartitionedTopic(testTenant, testNamespace, partitionedTopicName, true, 10);
+        persistentTopics.updatePartitionedTopic(testTenant, testNamespace, partitionedTopicName, true, false, 10);
     }
 
     @Test