You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/07/25 04:20:48 UTC

[pulsar] branch branch-2.10 updated (3f946c79806 -> 98d1cc94d7a)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a change to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git


    from 3f946c79806 [improve][security] Add load multiple certificates support in TrustManagerProxy (#14798)
     new a600b148349 [fix][function] Ensure bytes is a well-formed UTF-8 byte sequence when decode the `FunctionState` bytes to string (#16199)
     new aef1854c53f Update/fix Swagger Annotation for param: authoritative (#16222)
     new 6196bf15abd  [improve][client] Add message key if exists to deadLetter messages (#16615)
     new ea6c3cc33b4 [enh] Broker/EntryFilter: make the delay for RESCHEDULED messages configurable (dispatcherFilterRescheduledMessageDelay) (#16602)
     new 14a10b2ee24 [fix][broker] BadVersionException when splitting bundles, delay 100ms and try again (#16612)
     new a0133e91c5b [fix][broker] The configuration loadBalancerNamespaceMaximumBundles is invalid (#16552)
     new 8840591ea81 [fix][broker] Fix consumer does not abide by the max unacks limitation for Shared subscription (#16670)
     new 4da2ccf89db [fix][broker] Retry to delete the namespace if new topics created during the namespace deletion (#16676)
     new 35eec778297 [improve][security] Upgrade aws-java-sdk-s3 to 1.12.261 (#16684)
     new 37c4873ce18 [fix][proxy] Do not preserve host when forwarding admin requests. (#16342)
     new 0ab33bf3013 [fix][broker] Fix consumer does not abide by the max unacks limitation for Key_Shared subscription (#16718)
     new 98d1cc94d7a Fix newLookup TooManyRequestsException message (#16594)

The 12 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 pom.xml                                            |   2 +-
 .../apache/pulsar/broker/ServiceConfiguration.java |   7 +
 .../pulsar/broker/admin/impl/NamespacesBase.java   |  26 ++-
 .../pulsar/broker/admin/v1/PersistentTopics.java   |  12 +-
 .../broker/admin/v2/NonPersistentTopics.java       |  10 +-
 .../pulsar/broker/admin/v2/PersistentTopics.java   | 229 +++++++++++----------
 .../loadbalance/impl/BundleSplitterTask.java       |  12 +-
 .../pulsar/broker/namespace/NamespaceService.java  |   6 +-
 .../broker/service/AbstractBaseDispatcher.java     |   2 +-
 .../PersistentDispatcherMultipleConsumers.java     |   3 +
 ...istentStickyKeyDispatcherMultipleConsumers.java |   6 +-
 .../apache/pulsar/broker/admin/AdminApiTest.java   |   6 +-
 .../loadbalance/impl/BundleSplitterTaskTest.java   |  52 +++++
 .../pulsar/client/api/DeadLetterTopicTest.java     |  60 ++++++
 .../client/api/SimpleProducerConsumerTest.java     |  51 +++++
 .../client/api/v1/V1_ProducerConsumerTest.java     |   2 +-
 .../org/apache/pulsar/client/impl/ClientCnx.java   |   2 +-
 .../apache/pulsar/client/impl/ConsumerImpl.java    |  10 +-
 .../functions/worker/rest/api/ComponentImpl.java   |  13 +-
 .../pulsar/proxy/server/ProxyServiceStarter.java   |   1 -
 .../proxy/server/AuthedAdminProxyHandlerTest.java  |   1 -
 .../SuperUserAuthedAdminProxyHandlerTest.java      |   1 -
 .../server/UnauthedAdminProxyHandlerTest.java      |   1 -
 ...stStateSource.java => TestByteStateSource.java} |  27 +--
 .../integration/functions/PulsarStateTest.java     |  69 ++++++-
 25 files changed, 437 insertions(+), 174 deletions(-)
 copy tests/docker-images/java-test-functions/src/main/java/org/apache/pulsar/tests/integration/io/{TestStateSource.java => TestByteStateSource.java} (61%)


[pulsar] 03/12: [improve][client] Add message key if exists to deadLetter messages (#16615)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 6196bf15abdc7840b33ce3c32c190224b2c2fe5c
Author: gaozhangmin <ga...@qq.com>
AuthorDate: Fri Jul 15 18:03:29 2022 +0800

     [improve][client] Add message key if exists to deadLetter messages (#16615)
    
    (cherry picked from commit 8c528c4dc427c8456b3bdff8a35b273bba1d31c3)
---
 .../pulsar/client/api/DeadLetterTopicTest.java     | 60 ++++++++++++++++++++++
 .../apache/pulsar/client/impl/ConsumerImpl.java    | 10 ++--
 2 files changed, 67 insertions(+), 3 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java
index 2c780cc999c..cb9a4c3c104 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java
@@ -66,6 +66,66 @@ public class DeadLetterTopicTest extends ProducerConsumerBase {
         super.internalCleanup();
     }
 
+    @Test
+    public void testDeadLetterTopicWithMessageKey() throws Exception {
+        final String topic = "persistent://my-property/my-ns/dead-letter-topic";
+
+        final int maxRedeliveryCount = 1;
+
+        final int sendMessages = 100;
+
+        Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
+                .topic(topic)
+                .subscriptionName("my-subscription")
+                .subscriptionType(SubscriptionType.Shared)
+                .ackTimeout(1, TimeUnit.SECONDS)
+                .deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(maxRedeliveryCount).build())
+                .receiverQueueSize(100)
+                .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                .subscribe();
+
+        @Cleanup
+        PulsarClient newPulsarClient = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection
+        Consumer<byte[]> deadLetterConsumer = newPulsarClient.newConsumer(Schema.BYTES)
+                .topic("persistent://my-property/my-ns/dead-letter-topic-my-subscription-DLQ")
+                .subscriptionName("my-subscription")
+                .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                .subscribe();
+
+        Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
+                .topic(topic)
+                .create();
+
+        for (int i = 0; i < sendMessages; i++) {
+            producer.newMessage()
+                    .key("test-key")
+                    .value(String.format("Hello Pulsar [%d]", i).getBytes())
+                    .send();
+        }
+
+        producer.close();
+
+        int totalReceived = 0;
+        do {
+            Message<byte[]> message = consumer.receive();
+            log.info("consumer received message : {} {}", message.getMessageId(), new String(message.getData()));
+            totalReceived++;
+        } while (totalReceived < sendMessages * (maxRedeliveryCount + 1));
+
+        int totalInDeadLetter = 0;
+        do {
+            Message message = deadLetterConsumer.receive();
+            assertEquals(message.getKey(), "test-key");
+            log.info("dead letter consumer received message : {} {}", message.getMessageId(), new String(message.getData()));
+            deadLetterConsumer.acknowledge(message);
+            totalInDeadLetter++;
+        } while (totalInDeadLetter < sendMessages);
+
+        deadLetterConsumer.close();
+        consumer.close();
+    }
+
+
     @Test(groups = "quarantine")
     public void testDeadLetterTopic() throws Exception {
         final String topic = "persistent://my-property/my-ns/dead-letter-topic";
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 4d59064a035..eea0d603e85 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -1961,10 +1961,14 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
                 for (MessageImpl<T> message : finalDeadLetterMessages) {
                     String originMessageIdStr = getOriginMessageIdStr(message);
                     String originTopicNameStr = getOriginTopicNameStr(message);
-                    producerDLQ.newMessage(Schema.AUTO_PRODUCE_BYTES(message.getReaderSchema().get()))
+                    TypedMessageBuilder<byte[]> typedMessageBuilderNew =
+                            producerDLQ.newMessage(Schema.AUTO_PRODUCE_BYTES(message.getReaderSchema().get()))
                             .value(message.getData())
-                            .properties(getPropertiesMap(message, originMessageIdStr, originTopicNameStr))
-                            .sendAsync()
+                            .properties(getPropertiesMap(message, originMessageIdStr, originTopicNameStr));
+                    if (message.hasKey()) {
+                        typedMessageBuilderNew.key(message.getKey());
+                    }
+                    typedMessageBuilderNew.sendAsync()
                             .thenAccept(messageIdInDLQ -> {
                                 possibleSendToDeadLetterTopicMessages.remove(finalMessageId);
                                 acknowledgeAsync(finalMessageId).whenComplete((v, ex) -> {


[pulsar] 05/12: [fix][broker] BadVersionException when splitting bundles, delay 100ms and try again (#16612)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 14a10b2ee246cc297e3c60485465a919a6a4a785
Author: Baodi Shi <wu...@icloud.com>
AuthorDate: Mon Jul 18 09:50:42 2022 +0800

    [fix][broker] BadVersionException when splitting bundles, delay 100ms and try again (#16612)
    
    (cherry picked from commit 630c7589bbba46a0b99327caa4e28664784b8bc2)
---
 .../java/org/apache/pulsar/broker/namespace/NamespaceService.java   | 6 ++++--
 .../src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java  | 6 +++---
 2 files changed, 7 insertions(+), 5 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
index 5912dca1339..cf7ac381d43 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.broker.namespace;
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 import static java.lang.String.format;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static java.util.concurrent.TimeUnit.SECONDS;
 import static org.apache.commons.lang3.StringUtils.isNotBlank;
 import com.google.common.collect.Lists;
@@ -893,9 +894,10 @@ public class NamespaceService implements AutoCloseable {
                     // retry several times on BadVersion
                     if ((t.getCause() instanceof MetadataStoreException.BadVersionException)
                             && (counter.decrementAndGet() >= 0)) {
-                        pulsar.getOrderedExecutor()
+                        pulsar.getExecutor().schedule(() -> pulsar.getOrderedExecutor()
                                 .execute(() -> splitAndOwnBundleOnceAndRetry(
-                                        bundle, unload, counter, completionFuture, splitAlgorithm));
+                                        bundle, unload, counter, completionFuture, splitAlgorithm)),
+                                100, MILLISECONDS);
                     } else if (t instanceof IllegalArgumentException) {
                         completionFuture.completeExceptionally(t);
                     } else {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
index c10f34288cb..3c5d629d2c0 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
@@ -1502,7 +1502,7 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest {
         try {
             admin.namespaces().splitNamespaceBundle(namespace, "0x00000000_0xffffffff", false, null);
         } catch (Exception e) {
-            fail("split bundle shouldn't have thrown exception");
+            fail("split bundle shouldn't have thrown exception", e);
         }
 
         // bundle-factory cache must have updated split bundles
@@ -1530,7 +1530,7 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest {
                 f.get();
             }
         } catch (Exception e) {
-            fail("split bundle shouldn't have thrown exception");
+            fail("split bundle shouldn't have thrown exception", e);
         }
 
         Awaitility.await().untilAsserted(() ->
@@ -1565,7 +1565,7 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest {
                 f.get();
             }
         } catch (Exception e) {
-            fail("split bundle shouldn't have thrown exception");
+            fail("split bundle shouldn't have thrown exception", e);
         }
         Awaitility.await().untilAsserted(() ->
                 assertEquals(bundleFactory.getBundles(NamespaceName.get(namespace)).getBundles().size(), 8));


[pulsar] 10/12: [fix][proxy] Do not preserve host when forwarding admin requests. (#16342)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 37c4873ce182d932ed3936046cfca54c5fd62830
Author: Yang Yang <yy...@streamnative.io>
AuthorDate: Thu Jul 21 15:05:49 2022 +0800

    [fix][proxy] Do not preserve host when forwarding admin requests. (#16342)
    
    (cherry picked from commit e8ee8aff448e6d0c329e99c8bda47c3b45228880)
---
 .../main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java    | 1 -
 .../java/org/apache/pulsar/proxy/server/AuthedAdminProxyHandlerTest.java | 1 -
 .../apache/pulsar/proxy/server/SuperUserAuthedAdminProxyHandlerTest.java | 1 -
 .../org/apache/pulsar/proxy/server/UnauthedAdminProxyHandlerTest.java    | 1 -
 4 files changed, 4 deletions(-)

diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java
index 4df36229b01..33cfa45b106 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java
@@ -254,7 +254,6 @@ public class ProxyServiceStarter {
 
         AdminProxyHandler adminProxyHandler = new AdminProxyHandler(config, discoveryProvider);
         ServletHolder servletHolder = new ServletHolder(adminProxyHandler);
-        servletHolder.setInitParameter("preserveHost", "true");
         server.addServlet("/admin", servletHolder);
         server.addServlet("/lookup", servletHolder);
 
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/AuthedAdminProxyHandlerTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/AuthedAdminProxyHandlerTest.java
index 735184628cd..a4a3f01284e 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/AuthedAdminProxyHandlerTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/AuthedAdminProxyHandlerTest.java
@@ -109,7 +109,6 @@ public class AuthedAdminProxyHandlerTest extends MockedPulsarServiceBaseTest {
         doReturn(report).when(discoveryProvider).nextBroker();
 
         ServletHolder servletHolder = new ServletHolder(new AdminProxyHandler(proxyConfig, discoveryProvider));
-        servletHolder.setInitParameter("preserveHost", "true");
         webServer.addServlet("/admin", servletHolder);
         webServer.addServlet("/lookup", servletHolder);
 
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/SuperUserAuthedAdminProxyHandlerTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/SuperUserAuthedAdminProxyHandlerTest.java
index fdc7f5c739d..9f0eb0998cc 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/SuperUserAuthedAdminProxyHandlerTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/SuperUserAuthedAdminProxyHandlerTest.java
@@ -106,7 +106,6 @@ public class SuperUserAuthedAdminProxyHandlerTest extends MockedPulsarServiceBas
         doReturn(report).when(discoveryProvider).nextBroker();
 
         ServletHolder servletHolder = new ServletHolder(new AdminProxyHandler(proxyConfig, discoveryProvider));
-        servletHolder.setInitParameter("preserveHost", "true");
         webServer.addServlet("/admin", servletHolder);
         webServer.addServlet("/lookup", servletHolder);
 
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/UnauthedAdminProxyHandlerTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/UnauthedAdminProxyHandlerTest.java
index 972655fe9d4..8bf19c61bc4 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/UnauthedAdminProxyHandlerTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/UnauthedAdminProxyHandlerTest.java
@@ -83,7 +83,6 @@ public class UnauthedAdminProxyHandlerTest extends MockedPulsarServiceBaseTest {
         discoveryProvider = spy(new BrokerDiscoveryProvider(proxyConfig, resource));
         adminProxyHandler = new AdminProxyWrapper(proxyConfig, discoveryProvider);
         ServletHolder servletHolder = new ServletHolder(adminProxyHandler);
-        servletHolder.setInitParameter("preserveHost", "true");
         webServer.addServlet("/admin", servletHolder);
         webServer.addServlet("/lookup", servletHolder);
 


[pulsar] 01/12: [fix][function] Ensure bytes is a well-formed UTF-8 byte sequence when decode the `FunctionState` bytes to string (#16199)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit a600b1483496cb2148bb7a3faeda5c9b2ca70b1b
Author: Cong Zhao <zh...@apache.org>
AuthorDate: Fri Jul 1 16:53:31 2022 +0800

    [fix][function] Ensure bytes is a well-formed UTF-8 byte sequence when decode the `FunctionState` bytes to string (#16199)
    
    (cherry picked from commit 172a84624f47bb722e6b419de4e239fbb8ecc154)
---
 .../functions/worker/rest/api/ComponentImpl.java   | 13 ++--
 .../tests/integration/io/TestByteStateSource.java  | 55 +++++++++++++++++
 .../integration/functions/PulsarStateTest.java     | 69 ++++++++++++++++++++--
 3 files changed, 126 insertions(+), 11 deletions(-)

diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
index 850f94bbbf7..0209248916b 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
@@ -28,7 +28,7 @@ import static org.apache.pulsar.functions.utils.FunctionCommon.getStateNamespace
 import static org.apache.pulsar.functions.utils.FunctionCommon.getUniquePackageName;
 import static org.apache.pulsar.functions.utils.FunctionCommon.isFunctionCodeBuiltin;
 import static org.apache.pulsar.functions.worker.rest.RestUtils.throwUnavailableException;
-
+import com.google.common.base.Utf8;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufUtil;
 import io.netty.buffer.Unpooled;
@@ -1109,10 +1109,13 @@ public abstract class ComponentImpl implements Component<PulsarWorkerService> {
                     if (kv.isNumber()) {
                         value = new FunctionState(key, null, null, kv.numberValue(), kv.version());
                     } else {
-                        try {
-                            value = new FunctionState(key, new String(ByteBufUtil.getBytes(kv.value(), kv.value().readerIndex(), kv.value().readableBytes()), UTF_8), null, null, kv.version());
-                        } catch (Exception e) {
-                            value = new FunctionState(key, null, ByteBufUtil.getBytes(kv.value()), null, kv.version());
+                        byte[] bytes = ByteBufUtil.getBytes(kv.value());
+                        if (Utf8.isWellFormed(bytes)) {
+                            value = new FunctionState(key, new String(bytes, UTF_8),
+                                    null, null, kv.version());
+                        } else {
+                            value = new FunctionState(
+                                    key, null, bytes, null, kv.version());
                         }
                     }
                 }
diff --git a/tests/docker-images/java-test-functions/src/main/java/org/apache/pulsar/tests/integration/io/TestByteStateSource.java b/tests/docker-images/java-test-functions/src/main/java/org/apache/pulsar/tests/integration/io/TestByteStateSource.java
new file mode 100644
index 00000000000..4fe382f5ce7
--- /dev/null
+++ b/tests/docker-images/java-test-functions/src/main/java/org/apache/pulsar/tests/integration/io/TestByteStateSource.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.io;
+
+import java.nio.ByteBuffer;
+import java.util.Base64;
+import java.util.Map;
+import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.io.core.Source;
+import org.apache.pulsar.io.core.SourceContext;
+
+public class TestByteStateSource implements Source<byte[]> {
+
+    private SourceContext sourceContext;
+
+    public static final String VALUE_BASE64 = "0a8001127e0a172e6576656e74732e437573746f6d65724372656174656412630a243"
+                                              + "2336366666263652d623038342d346631352d616565342d326330643135356131666"
+                                              + "36312026e311a3700000000000000000000000000000000000000000000000000000"
+                                              + "000000000000000000000000000000000000000000000000000000000";
+
+    @Override
+    public void open(Map<String, Object> config, SourceContext sourceContext) throws Exception {
+        sourceContext.putState("initial", ByteBuffer.wrap(Base64.getDecoder().decode(VALUE_BASE64)));
+        this.sourceContext = sourceContext;
+    }
+
+    @Override
+    public Record<byte[]> read() throws Exception {
+        Thread.sleep(50);
+        ByteBuffer initial = sourceContext.getState("initial");
+        sourceContext.putState("now", initial);
+        return initial::array;
+    }
+
+    @Override
+    public void close() throws Exception {
+
+    }
+}
\ No newline at end of file
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarStateTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarStateTest.java
index 0582a095c85..1dead726f85 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarStateTest.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarStateTest.java
@@ -18,6 +18,16 @@
  */
 package org.apache.pulsar.tests.integration.functions;
 
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.pulsar.tests.integration.functions.utils.CommandGenerator.JAVAJAR;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotEquals;
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+import com.google.common.base.Utf8;
+import java.util.Base64;
 import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
@@ -40,12 +50,6 @@ import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
 import org.awaitility.Awaitility;
 import org.testng.annotations.Test;
 
-import static java.nio.charset.StandardCharsets.UTF_8;
-import static org.apache.pulsar.tests.integration.functions.utils.CommandGenerator.JAVAJAR;
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertTrue;
-import static org.testng.Assert.fail;
-
 /**
  * State related test cases.
  */
@@ -57,6 +61,11 @@ public class PulsarStateTest extends PulsarStandaloneTestSuite {
 
     public static final String WORDCOUNT_PYTHON_FILE = "wordcount_function.py";
 
+    public static final String VALUE_BASE64 = "0a8001127e0a172e6576656e74732e437573746f6d65724372656174656412630a243"
+                                              + "2336366666263652d623038342d346631352d616565342d326330643135356131666"
+                                              + "36312026e311a3700000000000000000000000000000000000000000000000000000"
+                                              + "000000000000000000000000000000000000000000000000000000000";
+
     @Test(groups = {"python_state", "state", "function", "python_function"})
     public void testPythonWordCountFunction() throws Exception {
         String inputTopicName = "test-wordcount-py-input-" + randomName(8);
@@ -183,6 +192,54 @@ public class PulsarStateTest extends PulsarStandaloneTestSuite {
         getSinkInfoNotFound(sinkName);
     }
 
+    @Test(groups = {"java_state", "state", "function", "java_function"})
+    public void testBytes2StringNotUTF8() {
+        byte[] valueBytes = Base64.getDecoder().decode(VALUE_BASE64);
+        assertFalse(Utf8.isWellFormed(valueBytes));
+        assertNotEquals(valueBytes, new String(valueBytes, UTF_8).getBytes(UTF_8));
+    }
+
+    @Test(groups = {"java_state", "state", "function", "java_function"})
+    public void testSourceByteState() throws Exception {
+        String outputTopicName = "test-state-source-output-" + randomName(8);
+        String sourceName = "test-state-source-" + randomName(8);
+
+        submitSourceConnector(sourceName, outputTopicName, "org.apache.pulsar.tests.integration.io.TestByteStateSource",  JAVAJAR);
+
+        // get source info
+        getSourceInfoSuccess(sourceName);
+
+        // get source status
+        getSourceStatus(sourceName);
+
+        try (PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(container.getHttpServiceUrl()).build()) {
+
+            Awaitility.await().ignoreExceptions().untilAsserted(() -> {
+                SourceStatus status = admin.sources().getSourceStatus("public", "default", sourceName);
+                assertEquals(status.getInstances().size(), 1);
+                assertTrue(status.getInstances().get(0).getStatus().numWritten > 0);
+            });
+
+            {
+                FunctionState functionState =
+                        admin.functions().getFunctionState("public", "default", sourceName, "initial");
+                assertNull(functionState.getStringValue());
+                assertEquals(functionState.getByteValue(), Base64.getDecoder().decode(VALUE_BASE64));
+            }
+
+            Awaitility.await().ignoreExceptions().untilAsserted(() -> {
+                FunctionState functionState = admin.functions().getFunctionState("public", "default", sourceName, "now");
+                assertNull(functionState.getStringValue());
+                assertEquals(functionState.getByteValue(), Base64.getDecoder().decode(VALUE_BASE64));
+            });
+        }
+
+        // delete source
+        deleteSource(sourceName);
+
+        getSourceInfoNotFound(sourceName);
+    }
+
     private void submitSourceConnector(String sourceName,
                                          String outputTopicName,
                                          String className,


[pulsar] 11/12: [fix][broker] Fix consumer does not abide by the max unacks limitation for Key_Shared subscription (#16718)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 0ab33bf301355f84344a3cdc39df1ccac3a54e39
Author: Penghui Li <pe...@apache.org>
AuthorDate: Fri Jul 22 15:24:40 2022 +0800

    [fix][broker] Fix consumer does not abide by the max unacks limitation for Key_Shared subscription (#16718)
    
    (cherry picked from commit fd9c418147503ec226f2c87e187a58af6f601b7d)
---
 .../PersistentStickyKeyDispatcherMultipleConsumers.java |  6 +++++-
 .../pulsar/client/api/SimpleProducerConsumerTest.java   | 17 ++++++++++++++---
 2 files changed, 19 insertions(+), 4 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
index 0a402d8322a..60762d8400a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
@@ -236,7 +236,11 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
             Consumer consumer = current.getKey();
             List<Entry> entriesWithSameKey = current.getValue();
             int entriesWithSameKeyCount = entriesWithSameKey.size();
-            final int availablePermits = consumer == null ? 0 : Math.max(consumer.getAvailablePermits(), 0);
+            int availablePermits = consumer == null ? 0 : Math.max(consumer.getAvailablePermits(), 0);
+            if (consumer != null && consumer.getMaxUnackedMessages() > 0) {
+                availablePermits = Math.min(availablePermits,
+                        consumer.getMaxUnackedMessages() - consumer.getUnackedMessages());
+            }
             int maxMessagesForC = Math.min(entriesWithSameKeyCount, availablePermits);
             int messagesForC = getRestrictedMaxEntriesForConsumer(consumer, entriesWithSameKey, maxMessagesForC,
                     readType, consumerStickyKeyHashesMap.get(consumer));
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
index c90aa577961..8a6de0652fa 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
@@ -148,6 +148,16 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
         return new Object[][] { { true }, { false } };
     }
 
+    @DataProvider(name = "ackReceiptEnabledAndSubscriptionTypes")
+    public Object[][] ackReceiptEnabledAndSubscriptionTypes() {
+        return new Object[][] {
+                {true, SubscriptionType.Shared},
+                {true, SubscriptionType.Key_Shared},
+                {false, SubscriptionType.Shared},
+                {false, SubscriptionType.Key_Shared},
+        };
+    }
+
     @AfterMethod(alwaysRun = true)
     @Override
     protected void cleanup() throws Exception {
@@ -1590,8 +1600,9 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
         }
     }
 
-    @Test(dataProvider = "ackReceiptEnabled")
-    public void testMaxUnAckMessagesLowerThanPermits(boolean ackReceiptEnabled) throws PulsarClientException {
+    @Test(dataProvider = "ackReceiptEnabledAndSubscriptionTypes")
+    public void testMaxUnAckMessagesLowerThanPermits(boolean ackReceiptEnabled, SubscriptionType subType)
+            throws PulsarClientException {
         final int maxUnacks = 10;
         pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(maxUnacks);
         final String topic = "persistent://my-property/my-ns/testMaxUnAckMessagesLowerThanPermits";
@@ -1599,7 +1610,7 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
         @Cleanup
         Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
                 .topic(topic).subscriptionName("sub")
-                .subscriptionType(SubscriptionType.Shared)
+                .subscriptionType(subType)
                 .isAckReceiptEnabled(ackReceiptEnabled)
                 .acknowledgmentGroupTime(0, TimeUnit.SECONDS)
                 .subscribe();


[pulsar] 08/12: [fix][broker] Retry to delete the namespace if new topics created during the namespace deletion (#16676)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 4da2ccf89dbf5946d8dab172e19046a115e78375
Author: Penghui Li <pe...@apache.org>
AuthorDate: Wed Jul 20 23:11:44 2022 +0800

    [fix][broker] Retry to delete the namespace if new topics created during the namespace deletion (#16676)
    
    (cherry picked from commit 9077a73b30ea1ad0b8fa2f0cda589797ffcece2e)
---
 .../pulsar/broker/admin/impl/NamespacesBase.java   | 26 +++++++++++++++-------
 1 file changed, 18 insertions(+), 8 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index 06e54831461..cfb27a629f2 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -347,8 +347,15 @@ public abstract class NamespacesBase extends AdminResource {
             asyncResponse.resume(Response.noContent().build());
         })
         .exceptionally(ex -> {
-            log.error("[{}] Failed to remove namespace {}", clientAppId(), namespaceName, ex.getCause());
-            resumeAsyncResponseExceptionally(asyncResponse, ex);
+            Throwable cause = FutureUtil.unwrapCompletionException(ex);
+            log.error("[{}] Failed to remove namespace {}", clientAppId(), namespaceName, cause);
+            if (cause instanceof PulsarAdminException.ConflictException) {
+                log.info("[{}] There are new topics created during the namespace deletion, "
+                        + "retry to delete the namespace again.", namespaceName);
+                pulsar().getExecutor().execute(() -> internalDeleteNamespace(asyncResponse, authoritative));
+            } else {
+                resumeAsyncResponseExceptionally(asyncResponse, ex);
+            }
             return null;
         });
     }
@@ -539,15 +546,18 @@ public abstract class NamespacesBase extends AdminResource {
 
         FutureUtil.waitForAll(bundleFutures).thenCompose(__ -> internalClearZkSources()).handle((result, exception) -> {
             if (exception != null) {
-                if (exception.getCause() instanceof PulsarAdminException) {
-                    asyncResponse.resume(new RestException((PulsarAdminException) exception.getCause()));
-                    return null;
+                Throwable cause = FutureUtil.unwrapCompletionException(exception);
+                if (cause instanceof PulsarAdminException.ConflictException) {
+                    log.info("[{}] There are new topics created during the namespace deletion, "
+                            + "retry to force delete the namespace again.", namespaceName);
+                    pulsar().getExecutor().execute(() ->
+                            internalDeleteNamespaceForcefully(asyncResponse, authoritative));
                 } else {
                     log.error("[{}] Failed to remove forcefully owned namespace {}",
-                            clientAppId(), namespaceName, exception);
-                    asyncResponse.resume(new RestException(exception.getCause()));
-                    return null;
+                            clientAppId(), namespaceName, cause);
+                    asyncResponse.resume(new RestException(cause));
                 }
+                return null;
             }
             asyncResponse.resume(Response.noContent().build());
             return null;


[pulsar] 07/12: [fix][broker] Fix consumer does not abide by the max unacks limitation for Shared subscription (#16670)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 8840591ea8189f414f0fbd403dc59c68d9dd7e4e
Author: Penghui Li <pe...@apache.org>
AuthorDate: Wed Jul 20 10:51:35 2022 +0800

    [fix][broker] Fix consumer does not abide by the max unacks limitation for Shared subscription (#16670)
    
    (cherry picked from commit 42fe0603518be7db7a14802eb4274b6ea22b0c9a)
---
 .../PersistentDispatcherMultipleConsumers.java     |  3 ++
 .../client/api/SimpleProducerConsumerTest.java     | 40 ++++++++++++++++++++++
 .../client/api/v1/V1_ProducerConsumerTest.java     |  2 +-
 3 files changed, 44 insertions(+), 1 deletion(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index 33105ac5a3a..a17f0533a10 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -557,6 +557,9 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
 
             // round-robin dispatch batch size for this consumer
             int availablePermits = c.isWritable() ? c.getAvailablePermits() : 1;
+            if (c.getMaxUnackedMessages() > 0) {
+                availablePermits = Math.min(availablePermits, c.getMaxUnackedMessages() - c.getUnackedMessages());
+            }
             if (log.isDebugEnabled() && !c.isWritable()) {
                 log.debug("[{}-{}] consumer is not writable. dispatching only 1 message to {}; "
                                 + "availablePermits are {}", topic.getName(), name,
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
index ac0186c3c5a..c90aa577961 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
@@ -1590,6 +1590,46 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
         }
     }
 
+    @Test(dataProvider = "ackReceiptEnabled")
+    public void testMaxUnAckMessagesLowerThanPermits(boolean ackReceiptEnabled) throws PulsarClientException {
+        final int maxUnacks = 10;
+        pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(maxUnacks);
+        final String topic = "persistent://my-property/my-ns/testMaxUnAckMessagesLowerThanPermits";
+
+        @Cleanup
+        Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
+                .topic(topic).subscriptionName("sub")
+                .subscriptionType(SubscriptionType.Shared)
+                .isAckReceiptEnabled(ackReceiptEnabled)
+                .acknowledgmentGroupTime(0, TimeUnit.SECONDS)
+                .subscribe();
+
+        @Cleanup
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+                .enableBatching(false)
+                .topic(topic)
+                .create();
+
+        final int messages = 1000;
+        for (int i = 0; i < messages; i++) {
+            producer.sendAsync("Message - " + i);
+        }
+        producer.flush();
+        List<MessageId> receives = new ArrayList<>();
+        for (int i = 0; i < maxUnacks; i++) {
+            Message<String> received =  consumer.receive();
+            log.info("Received message {} with message ID {}", received.getValue(), received.getMessageId());
+            receives.add(received.getMessageId());
+        }
+        assertNull(consumer.receive(3, TimeUnit.SECONDS));
+        consumer.acknowledge(receives);
+        for (int i = 0; i < messages - maxUnacks; i++) {
+            Message<String> received =  consumer.receive();
+            log.info("Received message {} with message ID {}", received.getValue(), received.getMessageId());
+            consumer.acknowledge(received);
+        }
+    }
+
     /**
      * Verify: Consumer1 which doesn't send ack will not impact Consumer2 which sends ack for consumed message.
      *
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v1/V1_ProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v1/V1_ProducerConsumerTest.java
index 55c120592e1..e4cb941c650 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v1/V1_ProducerConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v1/V1_ProducerConsumerTest.java
@@ -1708,7 +1708,7 @@ public class V1_ProducerConsumerTest extends V1_ProducerConsumerBase {
             }
 
             // client should not receive all produced messages and should be blocked due to unack-messages
-            assertEquals(messages1.size(), receiverQueueSize);
+            assertEquals(messages1.size(), unAckedMessagesBufferSize);
             Set<MessageIdImpl> redeliveryMessages = messages1.stream().map(m -> {
                 return (MessageIdImpl) m.getMessageId();
             }).collect(Collectors.toSet());


[pulsar] 09/12: [improve][security] Upgrade aws-java-sdk-s3 to 1.12.261 (#16684)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 35eec7782972eb1d61623f7bf6a7427395c11647
Author: tison <wa...@gmail.com>
AuthorDate: Wed Jul 20 23:23:47 2022 +0800

    [improve][security] Upgrade aws-java-sdk-s3 to 1.12.261 (#16684)
    
    (cherry picked from commit e5183c3f418c7eec8e841106e2c8b06baa1dc4b5)
---
 pom.xml | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/pom.xml b/pom.xml
index c1e91dfbe84..90da5d763bd 100644
--- a/pom.xml
+++ b/pom.xml
@@ -144,7 +144,7 @@ flexible messaging model and an intuitive client API.</description>
     <aerospike-client.version>4.4.20</aerospike-client.version>
     <kafka-client.version>2.7.2</kafka-client.version>
     <rabbitmq-client.version>5.1.1</rabbitmq-client.version>
-    <aws-sdk.version>1.11.774</aws-sdk.version>
+    <aws-sdk.version>1.12.262</aws-sdk.version>
     <avro.version>1.10.2</avro.version>
     <joda.version>2.10.5</joda.version>
     <jclouds.version>2.5.0</jclouds.version>


[pulsar] 06/12: [fix][broker] The configuration loadBalancerNamespaceMaximumBundles is invalid (#16552)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit a0133e91c5bede8ebebe4f4263109a93930662e7
Author: LinChen <15...@qq.com>
AuthorDate: Mon Jul 18 22:13:06 2022 +0800

    [fix][broker] The configuration loadBalancerNamespaceMaximumBundles is invalid (#16552)
    
    (cherry picked from commit 5698b08d57f5497b355aa61ac33e7f1303f1ca8e)
---
 .../loadbalance/impl/BundleSplitterTask.java       | 12 ++++-
 .../loadbalance/impl/BundleSplitterTaskTest.java   | 52 ++++++++++++++++++++++
 2 files changed, 63 insertions(+), 1 deletion(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/BundleSplitterTask.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/BundleSplitterTask.java
index 751203ca124..2203dbafc37 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/BundleSplitterTask.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/BundleSplitterTask.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.broker.loadbalance.impl;
 
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
@@ -39,12 +40,16 @@ public class BundleSplitterTask implements BundleSplitStrategy {
     private static final Logger log = LoggerFactory.getLogger(BundleSplitStrategy.class);
     private final Set<String> bundleCache;
 
+    private final Map<String, Integer> namespaceBundleCount;
+
+
     /**
      * Construct a BundleSplitterTask.
      *
      */
     public BundleSplitterTask() {
         bundleCache = new HashSet<>();
+        namespaceBundleCount = new HashMap<>();
     }
 
     /**
@@ -61,12 +66,14 @@ public class BundleSplitterTask implements BundleSplitStrategy {
     @Override
     public Set<String> findBundlesToSplit(final LoadData loadData, final PulsarService pulsar) {
         bundleCache.clear();
+        namespaceBundleCount.clear();
         final ServiceConfiguration conf = pulsar.getConfiguration();
         int maxBundleCount = conf.getLoadBalancerNamespaceMaximumBundles();
         long maxBundleTopics = conf.getLoadBalancerNamespaceBundleMaxTopics();
         long maxBundleSessions = conf.getLoadBalancerNamespaceBundleMaxSessions();
         long maxBundleMsgRate = conf.getLoadBalancerNamespaceBundleMaxMsgRate();
         long maxBundleBandwidth = conf.getLoadBalancerNamespaceBundleMaxBandwidthMbytes() * LoadManagerShared.MIBI;
+
         loadData.getBrokerData().forEach((broker, brokerData) -> {
             LocalBrokerData localData = brokerData.getLocalData();
             for (final Map.Entry<String, NamespaceBundleStats> entry : localData.getLastStats().entrySet()) {
@@ -91,8 +98,11 @@ public class BundleSplitterTask implements BundleSplitStrategy {
                     try {
                         final int bundleCount = pulsar.getNamespaceService()
                                 .getBundleCount(NamespaceName.get(namespace));
-                        if (bundleCount < maxBundleCount) {
+                        if ((bundleCount + namespaceBundleCount.getOrDefault(namespace, 0))
+                                < maxBundleCount) {
                             bundleCache.add(bundle);
+                            int bundleNum = namespaceBundleCount.getOrDefault(namespace, 0);
+                            namespaceBundleCount.put(namespace, bundleNum + 1);
                         } else {
                             log.warn(
                                     "Could not split namespace bundle {} because namespace {} has too many bundles: {}",
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/BundleSplitterTaskTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/BundleSplitterTaskTest.java
index 7480989bbb5..9ff266ba96c 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/BundleSplitterTaskTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/BundleSplitterTaskTest.java
@@ -25,6 +25,7 @@ import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.TimeAverageMessageData;
 import org.apache.pulsar.broker.loadbalance.LoadData;
+import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData;
 import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;
 import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
@@ -94,6 +95,57 @@ public class BundleSplitterTaskTest {
         Assert.assertEquals(bundlesToSplit.size(), 0);
     }
 
+    @Test
+    public void testLoadBalancerNamespaceMaximumBundles() throws Exception {
+        pulsar.getConfiguration().setLoadBalancerNamespaceMaximumBundles(3);
+
+        final BundleSplitterTask bundleSplitterTask = new BundleSplitterTask();
+        LoadData loadData = new LoadData();
+
+        LocalBrokerData brokerData = new LocalBrokerData();
+        Map<String, NamespaceBundleStats> lastStats = new HashMap<>();
+        final NamespaceBundleStats namespaceBundleStats = new NamespaceBundleStats();
+        namespaceBundleStats.topics = 5;
+        lastStats.put("ten/ns/0x00000000_0x20000000", namespaceBundleStats);
+
+        final NamespaceBundleStats namespaceBundleStats2 = new NamespaceBundleStats();
+        namespaceBundleStats2.topics = 5;
+        lastStats.put("ten/ns/0x20000000_0x40000000", namespaceBundleStats2);
+
+        final NamespaceBundleStats namespaceBundleStats3 = new NamespaceBundleStats();
+        namespaceBundleStats3.topics = 5;
+        lastStats.put("ten/ns/0x40000000_0x60000000", namespaceBundleStats3);
+
+        brokerData.setLastStats(lastStats);
+        loadData.getBrokerData().put("broker", new BrokerData(brokerData));
+
+        BundleData bundleData1 = new BundleData();
+        TimeAverageMessageData averageMessageData1 = new TimeAverageMessageData();
+        averageMessageData1.setMsgRateIn(pulsar.getConfiguration().getLoadBalancerNamespaceBundleMaxMsgRate() * 2);
+        averageMessageData1.setMsgRateOut(1);
+        bundleData1.setLongTermData(averageMessageData1);
+        loadData.getBundleData().put("ten/ns/0x00000000_0x20000000", bundleData1);
+
+        BundleData bundleData2 = new BundleData();
+        TimeAverageMessageData averageMessageData2 = new TimeAverageMessageData();
+        averageMessageData2.setMsgRateIn(pulsar.getConfiguration().getLoadBalancerNamespaceBundleMaxMsgRate() * 2);
+        averageMessageData2.setMsgRateOut(1);
+        bundleData2.setLongTermData(averageMessageData2);
+        loadData.getBundleData().put("ten/ns/0x20000000_0x40000000", bundleData2);
+
+        BundleData bundleData3 = new BundleData();
+        TimeAverageMessageData averageMessageData3 = new TimeAverageMessageData();
+        averageMessageData3.setMsgRateIn(pulsar.getConfiguration().getLoadBalancerNamespaceBundleMaxMsgRate() * 2);
+        averageMessageData3.setMsgRateOut(1);
+        bundleData3.setLongTermData(averageMessageData3);
+        loadData.getBundleData().put("ten/ns/0x40000000_0x60000000", bundleData3);
+
+        int currentBundleCount = pulsar.getNamespaceService().getBundleCount(NamespaceName.get("ten/ns"));
+        final Set<String> bundlesToSplit = bundleSplitterTask.findBundlesToSplit(loadData, pulsar);
+        Assert.assertEquals(bundlesToSplit.size() + currentBundleCount,
+                pulsar.getConfiguration().getLoadBalancerNamespaceMaximumBundles());
+    }
+
 
     @AfterMethod(alwaysRun = true)
     void shutdown() throws Exception {


[pulsar] 04/12: [enh] Broker/EntryFilter: make the delay for RESCHEDULED messages configurable (dispatcherFilterRescheduledMessageDelay) (#16602)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit ea6c3cc33b4f0ca2d36c51ed110eacb57809d508
Author: Enrico Olivelli <eo...@apache.org>
AuthorDate: Fri Jul 15 14:02:15 2022 +0200

    [enh] Broker/EntryFilter: make the delay for RESCHEDULED messages configurable (dispatcherFilterRescheduledMessageDelay) (#16602)
    
    (cherry picked from commit 807a283cacdec6fcf2367e3865187e986143025b)
---
 .../main/java/org/apache/pulsar/broker/ServiceConfiguration.java   | 7 +++++++
 .../org/apache/pulsar/broker/service/AbstractBaseDispatcher.java   | 2 +-
 2 files changed, 8 insertions(+), 1 deletion(-)

diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index a94aaea2c92..ad1fa8ff574 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -980,6 +980,13 @@ public class ServiceConfiguration implements PulsarConfiguration {
     )
     private int dispatcherReadFailureBackoffMandatoryStopTimeInMs = 0;
 
+    @FieldContext(
+            dynamic = true,
+            category = CATEGORY_SERVER,
+            doc = "Time in milliseconds to delay the new delivery of a message when an EntryFilter returns RESCHEDULE."
+    )
+    private int dispatcherEntryFilterRescheduledMessageDelay = 1000;
+
     @FieldContext(
         dynamic = true,
         category = CATEGORY_SERVER,
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
index 3b79c9e27de..7f343617eca 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
@@ -240,7 +240,7 @@ public abstract class AbstractBaseDispatcher implements Dispatcher {
                         // simulate the Consumer rejected the message
                         subscription
                                 .redeliverUnacknowledgedMessages(consumer, entriesToRedeliver);
-                    }, 1, TimeUnit.SECONDS);
+                    }, serviceConfig.getDispatcherEntryFilterRescheduledMessageDelay(), TimeUnit.MILLISECONDS);
 
         }
 


[pulsar] 12/12: Fix newLookup TooManyRequestsException message (#16594)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 98d1cc94d7a5acee12007792197b1b893b63cb71
Author: Xiaoyu Hou <An...@gmail.com>
AuthorDate: Mon Jul 25 10:42:35 2022 +0800

    Fix newLookup TooManyRequestsException message (#16594)
    
    (cherry picked from commit 595bb55b8537ccb0da12dc7741a0e21fb30c23aa)
---
 .../src/main/java/org/apache/pulsar/client/impl/ClientCnx.java          | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
index 322138699d5..033365b362b 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
@@ -774,7 +774,7 @@ public class ClientCnx extends PulsarHandler {
                 future.completeExceptionally(new PulsarClientException.TooManyRequestsException(String.format(
                     "Requests number out of config: There are {%s} lookup requests outstanding and {%s} requests"
                             + " pending.",
-                    pendingLookupRequestSemaphore.availablePermits(),
+                    pendingLookupRequestSemaphore.getQueueLength(),
                     waitingLookupRequests.size())));
             }
         }


[pulsar] 02/12: Update/fix Swagger Annotation for param: authoritative (#16222)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit aef1854c53f3ce482e0d8f467a047fe4ed221774
Author: Michael Marshall <mm...@apache.org>
AuthorDate: Tue Jul 12 18:58:26 2022 -0500

    Update/fix Swagger Annotation for param: authoritative (#16222)
    
    * Update/fix Swagger Annotation for param: authoritative
    
    * Fix Checkstyle
    
    (cherry picked from commit b4ef4a3f4b752749277ae460d7e0739cf32672bc)
---
 .../pulsar/broker/admin/v1/PersistentTopics.java   |  12 +-
 .../broker/admin/v2/NonPersistentTopics.java       |  10 +-
 .../pulsar/broker/admin/v2/PersistentTopics.java   | 229 +++++++++++----------
 3 files changed, 127 insertions(+), 124 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
index 657f73824aa..c37ce8186c9 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
@@ -205,7 +205,7 @@ public class PersistentTopics extends PersistentTopicsBase {
             @PathParam("namespace") String namespace,
             @ApiParam(value = "Specify topic name", required = true)
             @PathParam("topic") @Encoded String encodedTopic,
-            @ApiParam(value = "Is authentication required to perform this operation")
+            @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateNamespaceName(tenant, cluster, namespace);
         validateTopicName(tenant, cluster, namespace, encodedTopic);
@@ -240,7 +240,7 @@ public class PersistentTopics extends PersistentTopicsBase {
     public void updatePartitionedTopic(@PathParam("property") String property, @PathParam("cluster") String cluster,
             @PathParam("namespace") String namespace, @PathParam("topic") @Encoded String encodedTopic,
             @QueryParam("updateLocalTopicOnly") @DefaultValue("false") boolean updateLocalTopicOnly,
-            @ApiParam(value = "Is authentication required to perform this operation")
+            @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
             @QueryParam("force") @DefaultValue("false") boolean force,
             int numPartitions) {
@@ -548,7 +548,7 @@ public class PersistentTopics extends PersistentTopicsBase {
             @PathParam("topic") @Encoded String encodedTopic,
             @ApiParam(value = "Subscription to be Expiry messages on")
             @PathParam("subName") String encodedSubName,
-            @ApiParam(value = "Is authentication required to perform this operation")
+            @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
             @ApiParam(name = "messageId", value = "messageId to reset back to (ledgerId:entryId)")
                     ResetCursorData resetCursorData) {
@@ -864,7 +864,7 @@ public class PersistentTopics extends PersistentTopicsBase {
             @PathParam("namespace") String namespace,
             @ApiParam(value = "Specify topic name", required = true)
             @PathParam("topic") @Encoded String encodedTopic,
-            @ApiParam(value = "Is authentication required to perform this operation")
+            @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         try {
             validateTopicName(tenant, cluster, namespace, encodedTopic);
@@ -899,7 +899,7 @@ public class PersistentTopics extends PersistentTopicsBase {
             @PathParam("topic") @Encoded String encodedTopic,
             @ApiParam(value = "Name of subscription", required = true)
             @PathParam("subName") String encodedSubName,
-            @ApiParam(value = "Is authentication required to perform this operation")
+            @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
             @ApiParam(value = "Whether to enable replicated subscription", required = true)
             boolean enabled) {
@@ -934,7 +934,7 @@ public class PersistentTopics extends PersistentTopicsBase {
             @PathParam("topic") @Encoded String encodedTopic,
             @ApiParam(value = "Name of subscription", required = true)
             @PathParam("subName") String encodedSubName,
-            @ApiParam(value = "Is authentication required to perform this operation")
+            @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, cluster, namespace, encodedTopic);
         internalGetReplicatedSubscriptionStatus(asyncResponse, decode(encodedSubName), authoritative);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
index 66867b68426..d6392b564d0 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
@@ -95,7 +95,7 @@ public class NonPersistentTopics extends PersistentTopics {
             @PathParam("namespace") String namespace,
             @ApiParam(value = "Specify topic name", required = true)
             @PathParam("topic") @Encoded String encodedTopic,
-            @ApiParam(value = "Is authentication required to perform this operation")
+            @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
             @ApiParam(value = "Is check configuration required to automatically create topic")
             @QueryParam("checkAllowAutoCreation") @DefaultValue("false") boolean checkAllowAutoCreation) {
@@ -156,7 +156,7 @@ public class NonPersistentTopics extends PersistentTopics {
             @PathParam("namespace") String namespace,
             @ApiParam(value = "Specify topic name", required = true)
             @PathParam("topic") @Encoded String encodedTopic,
-            @ApiParam(value = "Is authentication required to perform this operation")
+            @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
             @QueryParam("metadata") @DefaultValue("false") boolean metadata) {
         validateTopicName(tenant, namespace, encodedTopic);
@@ -235,7 +235,7 @@ public class NonPersistentTopics extends PersistentTopics {
             @PathParam("topic") @Encoded String encodedTopic,
             @ApiParam(value = "Get per partition stats")
             @QueryParam("perPartition") @DefaultValue("true") boolean perPartition,
-            @ApiParam(value = "Is authentication required to perform this operation")
+            @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
             @ApiParam(value = "If return precise backlog or imprecise backlog")
             @QueryParam("getPreciseBacklog") @DefaultValue("false") boolean getPreciseBacklog,
@@ -346,7 +346,7 @@ public class NonPersistentTopics extends PersistentTopics {
             @PathParam("namespace") String namespace,
             @ApiParam(value = "Specify topic name", required = true)
             @PathParam("topic") @Encoded String encodedTopic,
-            @ApiParam(value = "Is authentication required to perform this operation")
+            @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         try {
             validateTopicName(tenant, namespace, encodedTopic);
@@ -528,7 +528,7 @@ public class NonPersistentTopics extends PersistentTopics {
             @PathParam("namespace") String namespace,
             @ApiParam(value = "Specify topic name", required = true)
             @PathParam("topic") @Encoded String encodedTopic,
-            @ApiParam(value = "Is authentication required to perform this operation")
+            @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative){
         asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED.getStatusCode(),
                 "unsupport truncate"));
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index 3f1350105ed..c871446b58a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -317,7 +317,7 @@ public class PersistentTopics extends PersistentTopicsBase {
             @PathParam("namespace") String namespace,
             @ApiParam(value = "Specify topic name", required = true)
             @PathParam("topic") @Encoded String encodedTopic,
-            @ApiParam(value = "Is authentication required to perform this operation")
+            @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
             @ApiParam(value = "Key value pair properties for the topic metadata")
             Map<String, String> properties) {
@@ -340,7 +340,7 @@ public class PersistentTopics extends PersistentTopicsBase {
             @PathParam("topic") @Encoded String encodedTopic,
             @QueryParam("applied") @DefaultValue("false") boolean applied,
             @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
-            @ApiParam(value = "Is authentication required to perform this operation")
+            @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
         preValidation(authoritative)
@@ -361,7 +361,7 @@ public class PersistentTopics extends PersistentTopicsBase {
             @PathParam("tenant") String tenant,
             @PathParam("namespace") String namespace,
             @PathParam("topic") @Encoded String encodedTopic,
-            @ApiParam(value = "Is authentication required to perform this operation")
+            @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
             @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
             @ApiParam(value = "Offload policies for the specified topic") OffloadPoliciesImpl offloadPolicies) {
@@ -385,7 +385,7 @@ public class PersistentTopics extends PersistentTopicsBase {
             @PathParam("namespace") String namespace,
             @PathParam("topic") @Encoded String encodedTopic,
             @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
-            @ApiParam(value = "Is authentication required to perform this operation")
+            @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
         preValidation(authoritative)
@@ -409,7 +409,7 @@ public class PersistentTopics extends PersistentTopicsBase {
             @PathParam("topic") @Encoded String encodedTopic,
             @QueryParam("applied") @DefaultValue("false") boolean applied,
             @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
-            @ApiParam(value = "Is authentication required to perform this operation")
+            @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
         preValidation(authoritative)
@@ -431,7 +431,7 @@ public class PersistentTopics extends PersistentTopicsBase {
             @PathParam("namespace") String namespace,
             @PathParam("topic") @Encoded String encodedTopic,
             @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
-            @ApiParam(value = "Is authentication required to perform this operation")
+            @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
             @ApiParam(value = "Max unacked messages on consumer policies for the specified topic")
                     Integer maxUnackedNum) {
@@ -455,7 +455,7 @@ public class PersistentTopics extends PersistentTopicsBase {
             @PathParam("namespace") String namespace,
             @PathParam("topic") @Encoded String encodedTopic,
             @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
-            @ApiParam(value = "Is authentication required to perform this operation")
+            @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
         preValidation(authoritative)
@@ -478,7 +478,7 @@ public class PersistentTopics extends PersistentTopicsBase {
             @PathParam("namespace") String namespace,
             @PathParam("topic") @Encoded String encodedTopic,
             @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
-            @ApiParam(value = "Is authentication required to perform this operation")
+            @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
         preValidation(authoritative)
@@ -506,7 +506,7 @@ public class PersistentTopics extends PersistentTopicsBase {
             @ApiParam(value = "Interval to take deduplication snapshot for the specified topic")
                     Integer interval,
             @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
-            @ApiParam(value = "Is authentication required to perform this operation")
+            @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
         preValidation(authoritative)
@@ -528,7 +528,7 @@ public class PersistentTopics extends PersistentTopicsBase {
             @PathParam("namespace") String namespace,
             @PathParam("topic") @Encoded String encodedTopic,
             @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
-            @ApiParam(value = "Is authentication required to perform this operation")
+            @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
         preValidation(authoritative)
@@ -552,7 +552,7 @@ public class PersistentTopics extends PersistentTopicsBase {
             @PathParam("topic") @Encoded String encodedTopic,
             @QueryParam("applied") @DefaultValue("false") boolean applied,
             @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
-            @ApiParam(value = "Is authentication required to perform this operation")
+            @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
         preValidation(authoritative)
@@ -572,7 +572,7 @@ public class PersistentTopics extends PersistentTopicsBase {
             @PathParam("tenant") String tenant,
             @PathParam("namespace") String namespace,
             @PathParam("topic") @Encoded String encodedTopic,
-            @ApiParam(value = "Is authentication required to perform this operation")
+            @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
             @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
             @ApiParam(value = "inactive topic policies for the specified topic")
@@ -597,7 +597,7 @@ public class PersistentTopics extends PersistentTopicsBase {
             @PathParam("namespace") String namespace,
             @PathParam("topic") @Encoded String encodedTopic,
             @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
-            @ApiParam(value = "Is authentication required to perform this operation")
+            @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
         preValidation(authoritative)
@@ -621,7 +621,7 @@ public class PersistentTopics extends PersistentTopicsBase {
             @PathParam("topic") @Encoded String encodedTopic,
             @QueryParam("applied") @DefaultValue("false") boolean applied,
             @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
-            @ApiParam(value = "Is authentication required to perform this operation")
+            @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
         preValidation(authoritative)
@@ -644,7 +644,7 @@ public class PersistentTopics extends PersistentTopicsBase {
             @PathParam("namespace") String namespace,
             @PathParam("topic") @Encoded String encodedTopic,
             @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
-            @ApiParam(value = "Is authentication required to perform this operation")
+            @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
             @ApiParam(value = "Max unacked messages on subscription policies for the specified topic")
                     Integer maxUnackedNum) {
@@ -671,7 +671,7 @@ public class PersistentTopics extends PersistentTopicsBase {
             @PathParam("namespace") String namespace,
             @PathParam("topic") @Encoded String encodedTopic,
             @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
-            @ApiParam(value = "Is authentication required to perform this operation")
+            @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
         validateTopicPolicyOperation(topicName, PolicyName.MAX_UNACKED, PolicyOperation.WRITE);
@@ -696,7 +696,7 @@ public class PersistentTopics extends PersistentTopicsBase {
             @PathParam("topic") @Encoded String encodedTopic,
             @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
             @QueryParam("applied") @DefaultValue("false") boolean applied,
-            @ApiParam(value = "Is authentication required to perform this operation")
+            @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
         preValidation(authoritative)
@@ -719,7 +719,7 @@ public class PersistentTopics extends PersistentTopicsBase {
             @PathParam("namespace") String namespace,
             @PathParam("topic") @Encoded String encodedTopic,
             @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
-            @ApiParam(value = "Is authentication required to perform this operation")
+            @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
             @ApiParam(value = "Delayed delivery policies for the specified topic")
                     DelayedDeliveryPolicies deliveryPolicies) {
@@ -747,7 +747,7 @@ public class PersistentTopics extends PersistentTopicsBase {
             @PathParam("namespace") String namespace,
             @PathParam("topic") @Encoded String encodedTopic,
             @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
-            @ApiParam(value = "Is authentication required to perform this operation")
+            @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
         validatePoliciesReadOnlyAccess();
@@ -799,7 +799,7 @@ public class PersistentTopics extends PersistentTopicsBase {
             @ApiParam(value = "Specify topic name", required = true)
             @PathParam("topic") @Encoded String encodedTopic,
             @QueryParam("updateLocalTopicOnly") @DefaultValue("false") boolean updateLocalTopicOnly,
-            @ApiParam(value = "Is authentication required to perform this operation")
+            @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
             @QueryParam("force") @DefaultValue("false") boolean force,
             @ApiParam(value = "The number of partitions for the topic",
@@ -860,7 +860,7 @@ public class PersistentTopics extends PersistentTopicsBase {
             @PathParam("namespace") String namespace,
             @ApiParam(value = "Specify topic name", required = true)
             @PathParam("topic") @Encoded String encodedTopic,
-            @ApiParam(value = "Is authentication required to perform this operation")
+            @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
             @ApiParam(value = "Is check configuration required to automatically create topic")
             @QueryParam("checkAllowAutoCreation") @DefaultValue("false") boolean checkAllowAutoCreation) {
@@ -888,7 +888,7 @@ public class PersistentTopics extends PersistentTopicsBase {
             @PathParam("namespace") String namespace,
             @ApiParam(value = "Specify topic name", required = true)
             @PathParam("topic") @Encoded String encodedTopic,
-            @ApiParam(value = "Is authentication required to perform this operation")
+            @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validatePersistentTopicName(tenant, namespace, encodedTopic);
         internalGetPropertiesAsync(authoritative)
@@ -926,7 +926,7 @@ public class PersistentTopics extends PersistentTopicsBase {
             @ApiParam(value = "Stop all producer/consumer/replicator and delete topic forcefully",
                     defaultValue = "false", type = "boolean")
             @QueryParam("force") @DefaultValue("false") boolean force,
-            @ApiParam(value = "Is authentication required to perform this operation")
+            @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
             @ApiParam(value = "Delete the topic's schema storage")
             @QueryParam("deleteSchema") @DefaultValue("false") boolean deleteSchema) {
@@ -959,7 +959,7 @@ public class PersistentTopics extends PersistentTopicsBase {
             @PathParam("namespace") String namespace,
             @ApiParam(value = "Specify topic name", required = true)
             @PathParam("topic") @Encoded String encodedTopic,
-            @ApiParam(value = "Is authentication required to perform this operation")
+            @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         try {
             validateTopicName(tenant, namespace, encodedTopic);
@@ -994,7 +994,7 @@ public class PersistentTopics extends PersistentTopicsBase {
             @ApiParam(value = "Stop all producer/consumer/replicator and delete topic forcefully",
                     defaultValue = "false", type = "boolean")
             @QueryParam("force") @DefaultValue("false") boolean force,
-            @ApiParam(value = "Is authentication required to perform this operation")
+            @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
             @ApiParam(value = "Delete the topic's schema storage")
             @QueryParam("deleteSchema") @DefaultValue("false") boolean deleteSchema) {
@@ -1022,7 +1022,7 @@ public class PersistentTopics extends PersistentTopicsBase {
             @PathParam("namespace") String namespace,
             @ApiParam(value = "Specify topic name", required = true)
             @PathParam("topic") @Encoded String encodedTopic,
-            @ApiParam(value = "Is authentication required to perform this operation")
+            @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         try {
             validateTopicName(tenant, namespace, encodedTopic);
@@ -1052,7 +1052,7 @@ public class PersistentTopics extends PersistentTopicsBase {
             @PathParam("namespace") String namespace,
             @ApiParam(value = "Specify topic name", required = true)
             @PathParam("topic") @Encoded String encodedTopic,
-            @ApiParam(value = "Is authentication required to perform this operation")
+            @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
             @ApiParam(value = "If return precise backlog or imprecise backlog")
             @QueryParam("getPreciseBacklog") @DefaultValue("false") boolean getPreciseBacklog,
@@ -1083,7 +1083,7 @@ public class PersistentTopics extends PersistentTopicsBase {
             @PathParam("namespace") String namespace,
             @ApiParam(value = "Specify topic name", required = true)
             @PathParam("topic") @Encoded String encodedTopic,
-            @ApiParam(value = "Is authentication required to perform this operation")
+            @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
             @QueryParam("metadata") @DefaultValue("false") boolean metadata) {
         validateTopicName(tenant, namespace, encodedTopic);
@@ -1105,7 +1105,7 @@ public class PersistentTopics extends PersistentTopicsBase {
             @PathParam("tenant") String tenant,
             @ApiParam(value = "Specify the namespace", required = true)
             @PathParam("namespace") String namespace,
-            @ApiParam(value = "Is authentication required to perform this operation")
+            @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
             @ApiParam(value = "Specify topic name", required = true)
             @PathParam("topic")
@@ -1136,7 +1136,7 @@ public class PersistentTopics extends PersistentTopicsBase {
             @PathParam("topic") @Encoded String encodedTopic,
             @ApiParam(value = "Get per partition stats")
             @QueryParam("perPartition") @DefaultValue("true") boolean perPartition,
-            @ApiParam(value = "Is authentication required to perform this operation")
+            @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
             @ApiParam(value = "If return precise backlog or imprecise backlog")
             @QueryParam("getPreciseBacklog") @DefaultValue("false") boolean getPreciseBacklog,
@@ -1175,7 +1175,7 @@ public class PersistentTopics extends PersistentTopicsBase {
             @PathParam("namespace") String namespace,
             @ApiParam(value = "Specify topic name", required = true)
             @PathParam("topic") @Encoded String encodedTopic,
-            @ApiParam(value = "Is authentication required to perform this operation")
+            @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         try {
             validateTopicName(tenant, namespace, encodedTopic);
@@ -1214,7 +1214,7 @@ public class PersistentTopics extends PersistentTopicsBase {
             @ApiParam(value = "Disconnect and close all consumers and delete subscription forcefully",
                     defaultValue = "false", type = "boolean")
             @QueryParam("force") @DefaultValue("false") boolean force,
-            @ApiParam(value = "Is authentication required to perform this operation")
+            @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         try {
             validateTopicName(tenant, namespace, encodedTopic);
@@ -1251,7 +1251,7 @@ public class PersistentTopics extends PersistentTopicsBase {
             @PathParam("topic") @Encoded String encodedTopic,
             @ApiParam(value = "Name of subscription")
             @PathParam("subName") String encodedSubName,
-            @ApiParam(value = "Is authentication required to perform this operation")
+            @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         try {
             validateTopicName(tenant, namespace, encodedTopic);
@@ -1287,7 +1287,7 @@ public class PersistentTopics extends PersistentTopicsBase {
             @PathParam("subName") String encodedSubName,
             @ApiParam(value = "The number of messages to skip", defaultValue = "0")
             @PathParam("numMessages") int numMessages,
-            @ApiParam(value = "Is authentication required to perform this operation")
+            @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         try {
             validateTopicName(tenant, namespace, encodedTopic);
@@ -1323,7 +1323,7 @@ public class PersistentTopics extends PersistentTopicsBase {
             @PathParam("subName") String encodedSubName,
             @ApiParam(value = "Expires beyond the specified number of seconds", defaultValue = "0")
             @PathParam("expireTimeInSeconds") int expireTimeInSeconds,
-            @ApiParam(value = "Is authentication required to perform this operation")
+            @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         try {
             validateTopicName(tenant, namespace, encodedTopic);
@@ -1358,7 +1358,7 @@ public class PersistentTopics extends PersistentTopicsBase {
             @PathParam("topic") @Encoded String encodedTopic,
             @ApiParam(value = "Subscription to be Expiry messages on")
             @PathParam("subName") String encodedSubName,
-            @ApiParam(value = "Is authentication required to perform this operation")
+            @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
             @ApiParam(name = "messageId", value = "messageId to reset back to (ledgerId:entryId)")
             ResetCursorData resetCursorData) {
@@ -1398,7 +1398,7 @@ public class PersistentTopics extends PersistentTopicsBase {
             @PathParam("topic") @Encoded String encodedTopic,
             @ApiParam(value = "Expires beyond the specified number of seconds", defaultValue = "0")
             @PathParam("expireTimeInSeconds") int expireTimeInSeconds,
-            @ApiParam(value = "Is authentication required to perform this operation")
+            @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         try {
             validateTopicName(tenant, namespace, encodedTopic);
@@ -1434,7 +1434,7 @@ public class PersistentTopics extends PersistentTopicsBase {
             @PathParam("topic") @Encoded String topic,
             @ApiParam(value = "Subscription to create position on", required = true)
             @PathParam("subscriptionName") String encodedSubName,
-            @ApiParam(value = "Is authentication required to perform this operation")
+            @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
             @ApiParam(name = "messageId", value = "messageId where to create the subscription. "
                     + "It can be 'latest', 'earliest' or (ledgerId:entryId)",
@@ -1493,7 +1493,7 @@ public class PersistentTopics extends PersistentTopicsBase {
             @PathParam("subName") String encodedSubName,
             @ApiParam(value = "the timestamp to reset back")
             @PathParam("timestamp") long timestamp,
-            @ApiParam(value = "Is authentication required to perform this operation")
+            @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         try {
             validateTopicName(tenant, namespace, encodedTopic);
@@ -1529,7 +1529,7 @@ public class PersistentTopics extends PersistentTopicsBase {
             @ApiParam(value = "Subscription to update", required = true)
             @PathParam("subName") String encodedSubName,
             @ApiParam(value = "The new properties") Map<String, String> subscriptionProperties,
-            @ApiParam(value = "Is authentication required to perform this operation")
+            @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         try {
             validateTopicName(tenant, namespace, encodedTopic);
@@ -1566,7 +1566,7 @@ public class PersistentTopics extends PersistentTopicsBase {
             @PathParam("topic") @Encoded String encodedTopic,
             @ApiParam(name = "subName", value = "Subscription to reset position on", required = true)
             @PathParam("subName") String encodedSubName,
-            @ApiParam(value = "Is authentication required to perform this operation")
+            @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
             @ApiParam(name = "messageId", value = "messageId to reset back to (ledgerId:entryId)")
                     ResetCursorData resetCursorData) {
@@ -1605,7 +1605,7 @@ public class PersistentTopics extends PersistentTopicsBase {
             @PathParam("subName") String encodedSubName,
             @ApiParam(value = "The number of messages (default 1)", defaultValue = "1")
             @PathParam("messagePosition") int messagePosition,
-            @ApiParam(value = "Is authentication required to perform this operation")
+            @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
         return internalPeekNthMessage(decode(encodedSubName), messagePosition, authoritative);
@@ -1637,7 +1637,7 @@ public class PersistentTopics extends PersistentTopicsBase {
             @QueryParam("initialPosition") String initialPosition,
             @ApiParam(value = "The position of messages (default 1)", defaultValue = "1")
             @QueryParam("messagePosition") long messagePosition,
-            @ApiParam(value = "Is authentication required to perform this operation")
+            @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
         return internalExamineMessage(initialPosition, messagePosition, authoritative);
@@ -1668,7 +1668,7 @@ public class PersistentTopics extends PersistentTopicsBase {
             @PathParam("ledgerId") long ledgerId,
             @ApiParam(value = "The entry id", required = true)
             @PathParam("entryId") long entryId,
-            @ApiParam(value = "Is authentication required to perform this operation")
+            @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         try {
             validateTopicName(tenant, namespace, encodedTopic);
@@ -1703,7 +1703,7 @@ public class PersistentTopics extends PersistentTopicsBase {
             @PathParam("topic") @Encoded String encodedTopic,
             @ApiParam(value = "Specify the timestamp", required = true)
             @PathParam("timestamp") long timestamp,
-            @ApiParam(value = "Is authentication required to perform this operation")
+            @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
         internalGetMessageIdByTimestamp(timestamp, authoritative)
@@ -1736,7 +1736,7 @@ public class PersistentTopics extends PersistentTopicsBase {
             @PathParam("namespace") String namespace,
             @ApiParam(value = "Specify topic name", required = true)
             @PathParam("topic") @Encoded String encodedTopic,
-            @ApiParam(value = "Is authentication required to perform this operation")
+            @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
         return internalGetBacklog(authoritative);
@@ -1759,7 +1759,7 @@ public class PersistentTopics extends PersistentTopicsBase {
             @PathParam("namespace") String namespace,
             @ApiParam(value = "Specify topic name", required = true)
             @PathParam("topic") @Encoded String encodedTopic,
-            @ApiParam(value = "Is authentication required to perform this operation")
+            @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative, MessageIdImpl messageId) {
         validateTopicName(tenant, namespace, encodedTopic);
         internalGetBacklogSizeByMessageId(asyncResponse, messageId, authoritative);
@@ -1778,7 +1778,7 @@ public class PersistentTopics extends PersistentTopicsBase {
             @PathParam("namespace") String namespace,
             @PathParam("topic") @Encoded String encodedTopic,
             @QueryParam("applied") @DefaultValue("false") boolean applied,
-            @ApiParam(value = "Is authentication required to perform this operation")
+            @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
             @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal) {
         validateTopicName(tenant, namespace, encodedTopic);
@@ -1805,7 +1805,7 @@ public class PersistentTopics extends PersistentTopicsBase {
             @Suspended final AsyncResponse asyncResponse,
             @PathParam("tenant") String tenant, @PathParam("namespace") String namespace,
             @PathParam("topic") @Encoded String encodedTopic,
-            @ApiParam(value = "Is authentication required to perform this operation")
+            @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
             @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
             @QueryParam("backlogQuotaType") BacklogQuotaType backlogQuotaType, BacklogQuotaImpl backlogQuota) {
@@ -1831,7 +1831,7 @@ public class PersistentTopics extends PersistentTopicsBase {
             @PathParam("tenant") String tenant, @PathParam("namespace") String namespace,
             @PathParam("topic") @Encoded String encodedTopic,
             @QueryParam("backlogQuotaType") BacklogQuotaType backlogQuotaType,
-            @ApiParam(value = "Is authentication required to perform this operation")
+            @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
             @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal) {
         validateTopicName(tenant, namespace, encodedTopic);
@@ -1856,7 +1856,8 @@ public class PersistentTopics extends PersistentTopicsBase {
                               @PathParam("namespace") String namespace,
                               @PathParam("topic") @Encoded String encodedTopic,
                               @QueryParam("applied") @DefaultValue("false") boolean applied,
-                              @ApiParam(value = "Is authentication required to perform this operation")
+                              @ApiParam(value = "Whether leader broker redirected this call to this broker. "
+                                      + "For internal use.")
                               @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
         preValidation(authoritative)
@@ -1888,7 +1889,7 @@ public class PersistentTopics extends PersistentTopicsBase {
             @Suspended final AsyncResponse asyncResponse,
             @PathParam("tenant") String tenant, @PathParam("namespace") String namespace,
             @PathParam("topic") @Encoded String encodedTopic,
-            @ApiParam(value = "Is authentication required to perform this operation")
+            @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
             @ApiParam(value = "List of replication clusters", required = true) List<String> clusterIds) {
         validateTopicName(tenant, namespace, encodedTopic);
@@ -1913,7 +1914,7 @@ public class PersistentTopics extends PersistentTopicsBase {
             @PathParam("tenant") String tenant, @PathParam("namespace") String namespace,
             @PathParam("topic") @Encoded String encodedTopic,
             @QueryParam("backlogQuotaType") BacklogQuotaType backlogQuotaType,
-            @ApiParam(value = "Is authentication required to perform this operation")
+            @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
         preValidation(authoritative)
@@ -1938,7 +1939,7 @@ public class PersistentTopics extends PersistentTopicsBase {
             @PathParam("topic") @Encoded String encodedTopic,
             @QueryParam("applied") @DefaultValue("false") boolean applied,
             @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
-            @ApiParam(value = "Is authentication required to perform this operation")
+            @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
         preValidation(authoritative)
@@ -1975,7 +1976,7 @@ public class PersistentTopics extends PersistentTopicsBase {
             @ApiParam(value = "TTL in seconds for the specified namespace", required = true)
             @QueryParam("messageTTL") Integer messageTTL,
             @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
-            @ApiParam(value = "Is authentication required to perform this operation")
+            @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
         preValidation(authoritative)
@@ -2001,7 +2002,7 @@ public class PersistentTopics extends PersistentTopicsBase {
             @PathParam("tenant") String tenant,
             @PathParam("namespace") String namespace,
             @PathParam("topic") @Encoded String encodedTopic,
-            @ApiParam(value = "Is authentication required to perform this operation")
+            @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
             @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
@@ -2028,7 +2029,7 @@ public class PersistentTopics extends PersistentTopicsBase {
             @PathParam("topic") @Encoded String encodedTopic,
             @QueryParam("applied") @DefaultValue("false") boolean applied,
             @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
-            @ApiParam(value = "Is authentication required to perform this operation")
+            @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
         preValidation(authoritative)
@@ -2052,8 +2053,8 @@ public class PersistentTopics extends PersistentTopicsBase {
             @PathParam("tenant") String tenant,
             @PathParam("namespace") String namespace,
             @PathParam("topic") @Encoded String encodedTopic,
-            @ApiParam(value = "Is authentication required to perform this operation")
             @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
+            @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
             @ApiParam(value = "DeduplicationEnabled policies for the specified topic")
                     Boolean enabled) {
@@ -2079,7 +2080,7 @@ public class PersistentTopics extends PersistentTopicsBase {
             @PathParam("tenant") String tenant,
             @PathParam("namespace") String namespace,
             @PathParam("topic") @Encoded String encodedTopic,
-            @ApiParam(value = "Is authentication required to perform this operation")
+            @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
             @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
@@ -2106,7 +2107,7 @@ public class PersistentTopics extends PersistentTopicsBase {
             @PathParam("topic") @Encoded String encodedTopic,
             @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
             @QueryParam("applied") @DefaultValue("false") boolean applied,
-            @ApiParam(value = "Is authentication required to perform this operation")
+            @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
         preValidation(authoritative)
@@ -2131,7 +2132,7 @@ public class PersistentTopics extends PersistentTopicsBase {
             @PathParam("tenant") String tenant,
             @PathParam("namespace") String namespace,
             @PathParam("topic") @Encoded String encodedTopic,
-            @ApiParam(value = "Is authentication required to perform this operation")
+            @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
             @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
             @ApiParam(value = "Retention policies for the specified namespace") RetentionPolicies retention) {
@@ -2168,8 +2169,8 @@ public class PersistentTopics extends PersistentTopicsBase {
             @PathParam("tenant") String tenant,
             @PathParam("namespace") String namespace,
             @PathParam("topic") @Encoded String encodedTopic,
-            @ApiParam(value = "Is authentication required to perform this operation")
             @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
+            @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
         preValidation(authoritative)
@@ -2201,7 +2202,7 @@ public class PersistentTopics extends PersistentTopicsBase {
             @PathParam("topic") @Encoded String encodedTopic,
             @QueryParam("applied") @DefaultValue("false") boolean applied,
             @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
-            @ApiParam(value = "Is authentication required to perform this operation")
+            @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
         preValidation(authoritative)
@@ -2226,7 +2227,7 @@ public class PersistentTopics extends PersistentTopicsBase {
             @PathParam("tenant") String tenant,
             @PathParam("namespace") String namespace,
             @PathParam("topic") @Encoded String encodedTopic,
-            @ApiParam(value = "Is authentication required to perform this operation")
+            @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
             @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
             @ApiParam(value = "Bookkeeper persistence policies for specified topic")
@@ -2265,7 +2266,7 @@ public class PersistentTopics extends PersistentTopicsBase {
             @PathParam("namespace") String namespace,
             @PathParam("topic") @Encoded String encodedTopic,
             @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
-            @ApiParam(value = "Is authentication required to perform this operation")
+            @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
         preValidation(authoritative)
@@ -2296,7 +2297,7 @@ public class PersistentTopics extends PersistentTopicsBase {
             @PathParam("namespace") String namespace,
             @PathParam("topic") @Encoded String encodedTopic,
             @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
-            @ApiParam(value = "Is authentication required to perform this operation")
+            @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
         preValidation(authoritative)
@@ -2323,7 +2324,7 @@ public class PersistentTopics extends PersistentTopicsBase {
             @PathParam("namespace") String namespace,
             @PathParam("topic") @Encoded String encodedTopic,
             @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
-            @ApiParam(value = "Is authentication required to perform this operation")
+            @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
             @ApiParam(value = "The max subscriptions of the topic") int maxSubscriptionsPerTopic) {
         validateTopicName(tenant, namespace, encodedTopic);
@@ -2354,7 +2355,7 @@ public class PersistentTopics extends PersistentTopicsBase {
             @PathParam("namespace") String namespace,
             @PathParam("topic") @Encoded String encodedTopic,
             @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
-            @ApiParam(value = "Is authentication required to perform this operation")
+            @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
         preValidation(authoritative)
@@ -2384,7 +2385,7 @@ public class PersistentTopics extends PersistentTopicsBase {
             @PathParam("topic") @Encoded String encodedTopic,
             @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
             @QueryParam("applied") @DefaultValue("false") boolean applied,
-            @ApiParam(value = "Is authentication required to perform this operation")
+            @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
         preValidation(authoritative)
@@ -2410,7 +2411,7 @@ public class PersistentTopics extends PersistentTopicsBase {
             @PathParam("namespace") String namespace,
             @PathParam("topic") @Encoded String encodedTopic,
             @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
-            @ApiParam(value = "Is authentication required to perform this operation")
+            @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
             @ApiParam(value = "Replicator dispatch rate of the topic") DispatchRateImpl dispatchRate) {
         validateTopicName(tenant, namespace, encodedTopic);
@@ -2441,7 +2442,7 @@ public class PersistentTopics extends PersistentTopicsBase {
             @PathParam("namespace") String namespace,
             @PathParam("topic") @Encoded String encodedTopic,
             @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
-            @ApiParam(value = "Is authentication required to perform this operation")
+            @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
         preValidation(authoritative)
@@ -2471,7 +2472,7 @@ public class PersistentTopics extends PersistentTopicsBase {
             @PathParam("topic") @Encoded String encodedTopic,
             @QueryParam("applied") @DefaultValue("false") boolean applied,
             @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
-            @ApiParam(value = "Is authentication required to perform this operation")
+            @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
         preValidation(authoritative)
@@ -2496,7 +2497,7 @@ public class PersistentTopics extends PersistentTopicsBase {
             @PathParam("tenant") String tenant,
             @PathParam("namespace") String namespace,
             @PathParam("topic") @Encoded String encodedTopic,
-            @ApiParam(value = "Is authentication required to perform this operation")
+            @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
             @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
             @ApiParam(value = "The max producers of the topic") int maxProducers) {
@@ -2530,7 +2531,7 @@ public class PersistentTopics extends PersistentTopicsBase {
             @PathParam("namespace") String namespace,
             @PathParam("topic") @Encoded String encodedTopic,
             @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
-            @ApiParam(value = "Is authentication required to perform this operation")
+            @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
         preValidation(authoritative)
@@ -2562,7 +2563,7 @@ public class PersistentTopics extends PersistentTopicsBase {
             @PathParam("topic") @Encoded String encodedTopic,
             @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
             @QueryParam("applied") @DefaultValue("false") boolean applied,
-            @ApiParam(value = "Is authentication required to perform this operation")
+            @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
         preValidation(authoritative)
@@ -2588,7 +2589,7 @@ public class PersistentTopics extends PersistentTopicsBase {
             @PathParam("namespace") String namespace,
             @PathParam("topic") @Encoded String encodedTopic,
             @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
-            @ApiParam(value = "Is authentication required to perform this operation")
+            @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
             @ApiParam(value = "The max consumers of the topic") int maxConsumers) {
         validateTopicName(tenant, namespace, encodedTopic);
@@ -2621,7 +2622,7 @@ public class PersistentTopics extends PersistentTopicsBase {
             @PathParam("namespace") String namespace,
             @PathParam("topic") @Encoded String encodedTopic,
             @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
-            @ApiParam(value = "Is authentication required to perform this operation")
+            @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
         preValidation(authoritative)
@@ -2652,7 +2653,7 @@ public class PersistentTopics extends PersistentTopicsBase {
             @PathParam("namespace") String namespace,
             @PathParam("topic") @Encoded String encodedTopic,
             @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
-            @ApiParam(value = "Is authentication required to perform this operation")
+            @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
         preValidation(authoritative)
@@ -2680,7 +2681,7 @@ public class PersistentTopics extends PersistentTopicsBase {
             @PathParam("namespace") String namespace,
             @PathParam("topic") @Encoded String encodedTopic,
             @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
-            @ApiParam(value = "Is authentication required to perform this operation")
+            @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
             @ApiParam(value = "The max message size of the topic") int maxMessageSize) {
         validateTopicName(tenant, namespace, encodedTopic);
@@ -2715,7 +2716,7 @@ public class PersistentTopics extends PersistentTopicsBase {
             @PathParam("namespace") String namespace,
             @PathParam("topic") @Encoded String encodedTopic,
             @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
-            @ApiParam(value = "Is authentication required to perform this operation")
+            @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
         preValidation(authoritative)
@@ -2756,7 +2757,7 @@ public class PersistentTopics extends PersistentTopicsBase {
             @PathParam("namespace") String namespace,
             @ApiParam(value = "Specify topic name", required = true)
             @PathParam("topic") @Encoded String encodedTopic,
-            @ApiParam(value = "Is authentication required to perform this operation")
+            @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validatePersistentTopicName(tenant, namespace, encodedTopic);
         return internalTerminate(authoritative);
@@ -2782,7 +2783,8 @@ public class PersistentTopics extends PersistentTopicsBase {
                                           @PathParam("namespace") String namespace,
                                           @ApiParam(value = "Specify topic name", required = true)
                                           @PathParam("topic") @Encoded String encodedTopic,
-                                          @ApiParam(value = "Is authentication required to perform this operation")
+                                          @ApiParam(value = "Whether leader broker redirected this call to this broker."
+                                                  + " For internal use.")
                                           @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
         internalTerminatePartitionedTopic(asyncResponse, authoritative);
@@ -2810,7 +2812,7 @@ public class PersistentTopics extends PersistentTopicsBase {
             @PathParam("namespace") String namespace,
             @ApiParam(value = "Specify topic name", required = true)
             @PathParam("topic") @Encoded String encodedTopic,
-            @ApiParam(value = "Is authentication required to perform this operation")
+            @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         try {
             validateTopicName(tenant, namespace, encodedTopic);
@@ -2842,7 +2844,7 @@ public class PersistentTopics extends PersistentTopicsBase {
             @PathParam("namespace") String namespace,
             @ApiParam(value = "Specify topic name", required = true)
             @PathParam("topic") @Encoded String encodedTopic,
-            @ApiParam(value = "Is authentication required to perform this operation")
+            @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
         return internalCompactionStatus(authoritative);
@@ -2871,7 +2873,7 @@ public class PersistentTopics extends PersistentTopicsBase {
             @PathParam("namespace") String namespace,
             @ApiParam(value = "Specify topic name", required = true)
             @PathParam("topic") @Encoded String encodedTopic,
-            @ApiParam(value = "Is authentication required to perform this operation")
+            @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
                                MessageIdImpl messageId) {
         try {
@@ -2908,7 +2910,7 @@ public class PersistentTopics extends PersistentTopicsBase {
             @PathParam("namespace") String namespace,
             @ApiParam(value = "Specify topic name", required = true)
             @PathParam("topic") @Encoded String encodedTopic,
-            @ApiParam(value = "Is authentication required to perform this operation")
+            @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         try {
             validateTopicName(tenant, namespace, encodedTopic);
@@ -2941,7 +2943,7 @@ public class PersistentTopics extends PersistentTopicsBase {
             @PathParam("namespace") String namespace,
             @ApiParam(value = "Specify topic name", required = true)
             @PathParam("topic") @Encoded String encodedTopic,
-            @ApiParam(value = "Is authentication required to perform this operation")
+            @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         try {
             validateTopicName(tenant, namespace, encodedTopic);
@@ -2965,7 +2967,7 @@ public class PersistentTopics extends PersistentTopicsBase {
             @PathParam("topic") @Encoded String encodedTopic,
             @QueryParam("applied") @DefaultValue("false") boolean applied,
             @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
-            @ApiParam(value = "Is authentication required to perform this operation")
+            @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
         preValidation(authoritative)
@@ -2989,7 +2991,7 @@ public class PersistentTopics extends PersistentTopicsBase {
             @PathParam("tenant") String tenant,
             @PathParam("namespace") String namespace,
             @PathParam("topic") @Encoded String encodedTopic,
-            @ApiParam(value = "Is authentication required to perform this operation")
+            @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
             @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
             @ApiParam(value = "Dispatch rate for the specified topic") DispatchRateImpl dispatchRate) {
@@ -3027,7 +3029,7 @@ public class PersistentTopics extends PersistentTopicsBase {
             @PathParam("namespace") String namespace,
             @PathParam("topic") @Encoded String encodedTopic,
             @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
-            @ApiParam(value = "Is authentication required to perform this operation")
+            @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
         preValidation(authoritative)
@@ -3060,7 +3062,7 @@ public class PersistentTopics extends PersistentTopicsBase {
             @PathParam("topic") @Encoded String encodedTopic,
             @QueryParam("applied") @DefaultValue("false") boolean applied,
             @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
-            @ApiParam(value = "Is authentication required to perform this operation")
+            @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
         preValidation(authoritative)
@@ -3085,7 +3087,7 @@ public class PersistentTopics extends PersistentTopicsBase {
             @PathParam("tenant") String tenant,
             @PathParam("namespace") String namespace,
             @PathParam("topic") @Encoded String encodedTopic,
-            @ApiParam(value = "Is authentication required to perform this operation")
+            @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
             @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
             @ApiParam(value = "Subscription message dispatch rate for the specified topic")
@@ -3124,7 +3126,7 @@ public class PersistentTopics extends PersistentTopicsBase {
             @PathParam("namespace") String namespace,
             @PathParam("topic") @Encoded String encodedTopic,
             @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
-            @ApiParam(value = "Is authentication required to perform this operation")
+            @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
         preValidation(authoritative)
@@ -3157,7 +3159,7 @@ public class PersistentTopics extends PersistentTopicsBase {
             @PathParam("topic") @Encoded String encodedTopic,
             @QueryParam("applied") @DefaultValue("false") boolean applied,
             @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
-            @ApiParam(value = "Is authentication required to perform this operation")
+            @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
         preValidation(authoritative)
@@ -3181,7 +3183,7 @@ public class PersistentTopics extends PersistentTopicsBase {
             @PathParam("tenant") String tenant,
             @PathParam("namespace") String namespace,
             @PathParam("topic") @Encoded String encodedTopic,
-            @ApiParam(value = "Is authentication required to perform this operation")
+            @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
             @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
             @ApiParam(value = "Dispatch rate for the specified topic") long compactionThreshold) {
@@ -3219,7 +3221,7 @@ public class PersistentTopics extends PersistentTopicsBase {
             @PathParam("namespace") String namespace,
             @PathParam("topic") @Encoded String encodedTopic,
             @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
-            @ApiParam(value = "Is authentication required to perform this operation")
+            @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
         preValidation(authoritative)
@@ -3251,7 +3253,7 @@ public class PersistentTopics extends PersistentTopicsBase {
             @PathParam("namespace") String namespace,
             @PathParam("topic") @Encoded String encodedTopic,
             @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
-            @ApiParam(value = "Is authentication required to perform this operation")
+            @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
         preValidation(authoritative)
@@ -3278,7 +3280,7 @@ public class PersistentTopics extends PersistentTopicsBase {
             @PathParam("namespace") String namespace,
             @PathParam("topic") @Encoded String encodedTopic,
             @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
-            @ApiParam(value = "Is authentication required to perform this operation")
+            @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
             @ApiParam(value = "Dispatch rate for the specified topic") int maxConsumersPerSubscription) {
         validateTopicName(tenant, namespace, encodedTopic);
@@ -3315,7 +3317,7 @@ public class PersistentTopics extends PersistentTopicsBase {
             @PathParam("namespace") String namespace,
             @PathParam("topic") @Encoded String encodedTopic,
             @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
-            @ApiParam(value = "Is authentication required to perform this operation")
+            @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
         preValidation(authoritative)
@@ -3348,7 +3350,7 @@ public class PersistentTopics extends PersistentTopicsBase {
             @PathParam("namespace") String namespace,
             @PathParam("topic") @Encoded String encodedTopic,
             @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
-            @ApiParam(value = "Is authentication required to perform this operation")
+            @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
         preValidation(authoritative)
@@ -3374,7 +3376,7 @@ public class PersistentTopics extends PersistentTopicsBase {
             @PathParam("namespace") String namespace,
             @PathParam("topic") @Encoded String encodedTopic,
             @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
-            @ApiParam(value = "Is authentication required to perform this operation")
+            @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
             @ApiParam(value = "Dispatch rate for the specified topic") PublishRate publishRate) {
         validateTopicName(tenant, namespace, encodedTopic);
@@ -3412,7 +3414,7 @@ public class PersistentTopics extends PersistentTopicsBase {
             @PathParam("namespace") String namespace,
             @PathParam("topic") @Encoded String encodedTopic,
             @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
-            @ApiParam(value = "Is authentication required to perform this operation")
+            @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
         preValidation(authoritative)
@@ -3444,8 +3446,8 @@ public class PersistentTopics extends PersistentTopicsBase {
             @PathParam("tenant") String tenant,
             @PathParam("namespace") String namespace,
             @PathParam("topic") @Encoded String encodedTopic,
-            @ApiParam(value = "Is authentication required to perform this operation")
             @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
+            @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
         preValidation(authoritative)
@@ -3472,8 +3474,8 @@ public class PersistentTopics extends PersistentTopicsBase {
             @PathParam("tenant") String tenant,
             @PathParam("namespace") String namespace,
             @PathParam("topic") @Encoded String encodedTopic,
-            @ApiParam(value = "Is authentication required to perform this operation")
             @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
+            @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
             @ApiParam(value = "Enable sub types for the specified topic")
             Set<SubscriptionType> subscriptionTypesEnabled) {
@@ -3510,8 +3512,8 @@ public class PersistentTopics extends PersistentTopicsBase {
             @PathParam("tenant") String tenant,
             @PathParam("namespace") String namespace,
             @PathParam("topic") @Encoded String encodedTopic,
-            @ApiParam(value = "Is authentication required to perform this operation")
             @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
+            @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
         preValidation(authoritative)
@@ -3543,7 +3545,7 @@ public class PersistentTopics extends PersistentTopicsBase {
             @PathParam("topic") @Encoded String encodedTopic,
             @QueryParam("applied") @DefaultValue("false") boolean applied,
             @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
-            @ApiParam(value = "Is authentication required to perform this operation")
+            @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
         preValidation(authoritative)
@@ -3568,7 +3570,7 @@ public class PersistentTopics extends PersistentTopicsBase {
             @PathParam("namespace") String namespace,
             @PathParam("topic") @Encoded String encodedTopic,
             @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
-            @ApiParam(value = "Is authentication required to perform this operation")
+            @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
             @ApiParam(value = "Subscribe rate for the specified topic") SubscribeRate subscribeRate) {
         validateTopicName(tenant, namespace, encodedTopic);
@@ -3606,6 +3608,7 @@ public class PersistentTopics extends PersistentTopicsBase {
             @PathParam("namespace") String namespace,
             @PathParam("topic") @Encoded String encodedTopic,
             @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
+            @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
             @ApiParam(value = "Subscribe rate for the specified topic") SubscribeRate subscribeRate) {
         validateTopicName(tenant, namespace, encodedTopic);
@@ -3646,7 +3649,7 @@ public class PersistentTopics extends PersistentTopicsBase {
             @PathParam("namespace") String namespace,
             @ApiParam(value = "Specify topic name", required = true)
             @PathParam("topic") @Encoded String encodedTopic,
-            @ApiParam(value = "Is authentication required to perform this operation")
+            @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative){
         validateTopicName(tenant, namespace, encodedTopic);
         internalTruncateTopic(asyncResponse, authoritative);
@@ -3676,7 +3679,7 @@ public class PersistentTopics extends PersistentTopicsBase {
             @PathParam("topic") @Encoded String encodedTopic,
             @ApiParam(value = "Name of subscription", required = true)
             @PathParam("subName") String encodedSubName,
-            @ApiParam(value = "Is authentication required to perform this operation")
+            @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
             @ApiParam(value = "Whether to enable replicated subscription", required = true)
             boolean enabled) {
@@ -3709,7 +3712,7 @@ public class PersistentTopics extends PersistentTopicsBase {
             @PathParam("topic") @Encoded String encodedTopic,
             @ApiParam(value = "Name of subscription", required = true)
             @PathParam("subName") String encodedSubName,
-            @ApiParam(value = "Is authentication required to perform this operation")
+            @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
         internalGetReplicatedSubscriptionStatus(asyncResponse, decode(encodedSubName), authoritative);
@@ -3732,7 +3735,7 @@ public class PersistentTopics extends PersistentTopicsBase {
             @ApiParam(value = "Specify topic name", required = true)
             @PathParam("topic") @Encoded String encodedTopic,
             @QueryParam("applied") @DefaultValue("false") boolean applied,
-            @ApiParam(value = "Is authentication required to perform this operation")
+            @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
 
@@ -3761,7 +3764,7 @@ public class PersistentTopics extends PersistentTopicsBase {
             @PathParam("namespace") String namespace,
             @ApiParam(value = "Specify topic name", required = true)
             @PathParam("topic") @Encoded String encodedTopic,
-            @ApiParam(value = "Is authentication required to perform this operation")
+            @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
             @ApiParam(value = "Strategy used to check the compatibility of new schema")
                     SchemaCompatibilityStrategy strategy) {
@@ -3801,7 +3804,7 @@ public class PersistentTopics extends PersistentTopicsBase {
             @PathParam("namespace") String namespace,
             @ApiParam(value = "Specify topic name", required = true)
             @PathParam("topic") @Encoded String encodedTopic,
-            @ApiParam(value = "Is authentication required to perform this operation")
+            @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
             @ApiParam(value = "Strategy used to check the compatibility of new schema")
                     SchemaCompatibilityStrategy strategy) {