You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by zh...@apache.org on 2020/12/08 00:26:34 UTC

[pulsar] branch master updated: [CheckStyle]Make codestyle of module `pulsar-broker` gradually conform CheckStyle (#8848)

This is an automated email from the ASF dual-hosted git repository.

zhaijia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 0e89f96  [CheckStyle]Make codestyle  of module `pulsar-broker` gradually conform CheckStyle (#8848)
0e89f96 is described below

commit 0e89f967dda9718b5d00d0daa982b14d1d55bffd
Author: Renkai Ge <ga...@gmail.com>
AuthorDate: Tue Dec 8 08:26:05 2020 +0800

    [CheckStyle]Make codestyle  of module `pulsar-broker` gradually conform CheckStyle (#8848)
    
    * add provided jclouds
    
    Signed-off-by: Renkai <ga...@gmail.com>
    
    * make broker-style gradually conform checkstyle
    
    Signed-off-by: Renkai <ga...@gmail.com>
    
    * make broker-style gradually conform checkstyle
    
    Signed-off-by: Renkai <ga...@gmail.com>
    
    * make broker-style gradually conform checkstyle
    
    Signed-off-by: Renkai <ga...@gmail.com>
    
    * make broker-style gradually conform checkstyle
    
    Signed-off-by: Renkai <ga...@gmail.com>
---
 .../src/main/resources/pulsar/suppressions.xml     |   1 -
 .../pulsar/broker/admin/impl/BrokerStatsBase.java  |  32 +-
 .../pulsar/broker/admin/impl/BrokersBase.java      |  28 +-
 .../pulsar/broker/admin/impl/ClustersBase.java     |  15 +-
 .../pulsar/broker/admin/impl/FunctionsBase.java    | 425 ++++++++--------
 .../pulsar/broker/admin/impl/NamespacesBase.java   | 436 ++++++++++-------
 .../broker/admin/impl/PersistentTopicsBase.java    | 532 +++++++++++++--------
 .../apache/pulsar/broker/admin/impl/SinksBase.java | 320 +++++++------
 .../pulsar/broker/admin/impl/SourcesBase.java      | 200 ++++----
 .../pulsar/broker/admin/impl/TenantsBase.java      |  11 +-
 10 files changed, 1160 insertions(+), 840 deletions(-)

diff --git a/buildtools/src/main/resources/pulsar/suppressions.xml b/buildtools/src/main/resources/pulsar/suppressions.xml
index 66b3a4e..3294a87 100644
--- a/buildtools/src/main/resources/pulsar/suppressions.xml
+++ b/buildtools/src/main/resources/pulsar/suppressions.xml
@@ -48,7 +48,6 @@
     <suppress checks="ConstantName" files="MessageId.java"/>
     <suppress checks="MethodName" files="TopicsImpl.java"/>
     <suppress checks="MemberName" files="TopicsImpl.java"/>
-    <suppress checks=".*" files="src/main/java/org/apache/pulsar/broker/admin/impl/.*.java"/>
     <suppress checks=".*" files="src/main/java/org/apache/pulsar/broker/cache/.*.java"/>
     <suppress checks=".*" files="src/main/java/org/apache/pulsar/broker/delayed/.*.java"/>
     <suppress checks=".*" files="src/main/java/org/apache/pulsar/broker/intercept/.*.java"/>
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokerStatsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokerStatsBase.java
index 28a4934..e92b4dc 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokerStatsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokerStatsBase.java
@@ -52,7 +52,9 @@ public class BrokerStatsBase extends AdminResource {
 
     @GET
     @Path("/metrics")
-    @ApiOperation(value = "Gets the metrics for Monitoring", notes = "Requested should be executed by Monitoring agent on each broker to fetch the metrics", response = Metrics.class, responseContainer = "List")
+    @ApiOperation(value = "Gets the metrics for Monitoring",
+            notes = "Requested should be executed by Monitoring agent on each broker to fetch the metrics",
+            response = Metrics.class, responseContainer = "List")
     @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission") })
     public Collection<Metrics> getMetrics() throws Exception {
         // Ensure super user access only
@@ -68,7 +70,8 @@ public class BrokerStatsBase extends AdminResource {
 
     @GET
     @Path("/mbeans")
-    @ApiOperation(value = "Get all the mbean details of this broker JVM", response = Metrics.class, responseContainer = "List")
+    @ApiOperation(value = "Get all the mbean details of this broker JVM",
+            response = Metrics.class, responseContainer = "List")
     @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission") })
     public Collection<Metrics> getMBeans() throws Exception {
         // Ensure super user access only
@@ -84,10 +87,9 @@ public class BrokerStatsBase extends AdminResource {
 
     @GET
     @Path("/destinations")
-    @ApiOperation(value = "Get all the topic stats by namespace", response = OutputStream.class, responseContainer = "OutputStream") // https://github.com/swagger-api/swagger-ui/issues/558
-                                                                                                                                           // map
-                                                                                                                                           // support
-                                                                                                                                           // missing
+    @ApiOperation(value = "Get all the topic stats by namespace", response = OutputStream.class,
+            responseContainer = "OutputStream") // https://github.com/swagger-api/swagger-ui/issues/558
+    // map support missing
     @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission") })
     public StreamingOutput getTopics2() throws Exception {
         // Ensure super user access only
@@ -103,7 +105,8 @@ public class BrokerStatsBase extends AdminResource {
 
     @GET
     @Path("/allocator-stats/{allocator}")
-    @ApiOperation(value = "Get the stats for the Netty allocator. Available allocators are 'default' and 'ml-cache'", response = AllocatorStats.class)
+    @ApiOperation(value = "Get the stats for the Netty allocator. Available allocators are 'default' and 'ml-cache'",
+            response = AllocatorStats.class)
     @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission") })
     public AllocatorStats getAllocatorStats(@PathParam("allocator") String allocatorName) throws Exception {
         // Ensure super user access only
@@ -121,15 +124,13 @@ public class BrokerStatsBase extends AdminResource {
 
     @GET
     @Path("/bookieops")
-    @ApiOperation(value = "Get pending bookie client op stats by namesapce", response = PendingBookieOpsStats.class, // https://github.com/swagger-api/swagger-core/issues/449
-                                                                                                                     // nested
-                                                                                                                     // containers
-                                                                                                                     // are
-                                                                                                                     // not
-                                                                                                                     // supported
+    @ApiOperation(value = "Get pending bookie client op stats by namesapce",
+            response = PendingBookieOpsStats.class,
+            // https://github.com/swagger-api/swagger-core/issues/449
+            // nested containers are not supported
             responseContainer = "Map")
     @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission") })
-    public Map<String, Map<String, PendingBookieOpsStats>> getPendingBookieOpsStats() throws Exception {
+    public Map<String, Map<String, PendingBookieOpsStats>> getPendingBookieOpsStats() {
         // Ensure super user access only
         validateSuperUserAccess();
         try {
@@ -142,7 +143,8 @@ public class BrokerStatsBase extends AdminResource {
 
     @GET
     @Path("/load-report")
-    @ApiOperation(value = "Get Load for this broker", notes = "consists of topics stats & systemResourceUsage", response = LoadReport.class)
+    @ApiOperation(value = "Get Load for this broker", notes = "consists of topics stats & systemResourceUsage",
+            response = LoadReport.class)
     @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission") })
     public LoadManagerReport getLoadReport() throws Exception {
         // Ensure super user access only
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
index e3d9bf3..85549af 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
@@ -99,11 +99,12 @@ public class BrokersBase extends AdminResource {
 
     @GET
     @Path("/{clusterName}/{broker-webserviceurl}/ownedNamespaces")
-    @ApiOperation(value = "Get the list of namespaces served by the specific broker", response = NamespaceOwnershipStatus.class, responseContainer = "Map")
+    @ApiOperation(value = "Get the list of namespaces served by the specific broker",
+            response = NamespaceOwnershipStatus.class, responseContainer = "Map")
     @ApiResponses(value = {
-        @ApiResponse(code = 307, message = "Current broker doesn't serve the cluster"),
-        @ApiResponse(code = 403, message = "Don't have admin permission"),
-        @ApiResponse(code = 404, message = "Cluster doesn't exist") })
+            @ApiResponse(code = 307, message = "Current broker doesn't serve the cluster"),
+            @ApiResponse(code = 403, message = "Don't have admin permission"),
+            @ApiResponse(code = 404, message = "Cluster doesn't exist") })
     public Map<String, NamespaceOwnershipStatus> getOwnedNamespaces(@PathParam("clusterName") String cluster,
             @PathParam("broker-webserviceurl") String broker) throws Exception {
         validateSuperUserAccess();
@@ -122,21 +123,24 @@ public class BrokersBase extends AdminResource {
 
     @POST
     @Path("/configuration/{configName}/{configValue}")
-    @ApiOperation(value = "Update dynamic serviceconfiguration into zk only. This operation requires Pulsar super-user privileges.")
+    @ApiOperation(value =
+            "Update dynamic serviceconfiguration into zk only. This operation requires Pulsar super-user privileges.")
     @ApiResponses(value = {
-        @ApiResponse(code = 204, message = "Service configuration updated successfully"),
-        @ApiResponse(code = 403, message = "You don't have admin permission to update service-configuration"),
-        @ApiResponse(code = 404, message = "Configuration not found"),
-        @ApiResponse(code = 412, message = "Invalid dynamic-config value"),
-        @ApiResponse(code = 500, message = "Internal server error")})
-    public void updateDynamicConfiguration(@PathParam("configName") String configName, @PathParam("configValue") String configValue) throws Exception {
+            @ApiResponse(code = 204, message = "Service configuration updated successfully"),
+            @ApiResponse(code = 403, message = "You don't have admin permission to update service-configuration"),
+            @ApiResponse(code = 404, message = "Configuration not found"),
+            @ApiResponse(code = 412, message = "Invalid dynamic-config value"),
+            @ApiResponse(code = 500, message = "Internal server error") })
+    public void updateDynamicConfiguration(@PathParam("configName") String configName,
+                                           @PathParam("configValue") String configValue) throws Exception {
         validateSuperUserAccess();
         updateDynamicConfigurationOnZk(configName, configValue);
     }
 
     @DELETE
     @Path("/configuration/{configName}")
-    @ApiOperation(value = "Delete dynamic serviceconfiguration into zk only. This operation requires Pulsar super-user privileges.")
+    @ApiOperation(value =
+            "Delete dynamic serviceconfiguration into zk only. This operation requires Pulsar super-user privileges.")
     @ApiResponses(value = { @ApiResponse(code = 204, message = "Service configuration updated successfully"),
             @ApiResponse(code = 403, message = "You don't have admin permission to update service-configuration"),
             @ApiResponse(code = 412, message = "Invalid dynamic-config value"),
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java
index 8fcef08..5e9dfbf 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java
@@ -523,7 +523,8 @@ public class ClustersBase extends AdminResource {
                             "NamespaceIsolationPolicies for cluster " + cluster + " does not exist"));
             // construct the response to Namespace isolation data map
             if (!nsIsolationPolicies.getPolicies().containsKey(policyName)) {
-                log.info("[{}] Cannot find NamespaceIsolationPolicy {} for cluster {}", clientAppId(), policyName, cluster);
+                log.info("[{}] Cannot find NamespaceIsolationPolicy {} for cluster {}",
+                        clientAppId(), policyName, cluster);
                 throw new RestException(Status.NOT_FOUND,
                         "Cannot find NamespaceIsolationPolicy " + policyName + " for cluster " + cluster);
             }
@@ -732,7 +733,8 @@ public class ClustersBase extends AdminResource {
         }
     }
 
-    private boolean createZnodeIfNotExist(String path, Optional<Object> value) throws KeeperException, InterruptedException {
+    private boolean createZnodeIfNotExist(String path, Optional<Object> value)
+            throws KeeperException, InterruptedException {
         // create persistent node on ZooKeeper
         if (globalZk().exists(path, false) == null) {
             // create all the intermediate nodes
@@ -742,7 +744,7 @@ public class ClustersBase extends AdminResource {
                         CreateMode.PERSISTENT);
                 return true;
             } catch (KeeperException.NodeExistsException nee) {
-                if(log.isDebugEnabled()) {
+                if (log.isDebugEnabled()) {
                     log.debug("Other broker preempted the full path [{}] already. Continue...", path);
                 }
             } catch (JsonGenerationException e) {
@@ -979,7 +981,8 @@ public class ClustersBase extends AdminResource {
         validateClusterExists(cluster);
 
         try {
-            final String domainPath = joinPath(pulsar().getConfigurationCache().CLUSTER_FAILURE_DOMAIN_ROOT, domainName);
+            final String domainPath = joinPath(pulsar().getConfigurationCache().CLUSTER_FAILURE_DOMAIN_ROOT,
+                    domainName);
             globalZk().delete(domainPath, -1);
             // clear domain cache
             failureDomainCache().invalidate(domainPath);
@@ -1004,7 +1007,9 @@ public class ClustersBase extends AdminResource {
                         continue;
                     }
                     try {
-                        Optional<FailureDomain> domain = failureDomainCache().get(joinPath(failureDomainRootPath, domainName));
+                        Optional<FailureDomain> domain =
+                                failureDomainCache()
+                                        .get(joinPath(failureDomainRootPath, domainName));
                         if (domain.isPresent() && domain.get().brokers != null) {
                             List<String> duplicateBrokers = domain.get().brokers.stream().parallel()
                                     .filter(inputDomain.brokers::contains).collect(Collectors.toList());
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
index 86ea652..2d2806b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
@@ -86,73 +86,91 @@ public class FunctionsBase extends AdminResource implements Supplier<WorkerServi
             final @FormDataParam("data") FormDataContentDisposition fileDetail,
             final @FormDataParam("url") String functionPkgUrl,
             @ApiParam(
-                    value = "A JSON value presenting configuration payload of a Pulsar Function. An example of the expected Pulsar Function can be found here.  \n" +
-                            "- **autoAck**  \n" +
-                            "  Whether or not the framework acknowledges messages automatically.  \n" +
-                            "- **runtime**  \n" +
-                            "  What is the runtime of the Pulsar Function. Possible Values: [JAVA, PYTHON, GO]  \n" +
-                            "- **resources**  \n" +
-                            "  The size of the system resources allowed by the Pulsar Function runtime. The resources include: cpu, ram, disk.  \n" +
-                            "- **className**  \n" +
-                            "  The class name of a Pulsar Function.  \n" +
-                            "- **customSchemaInputs**  \n" +
-                            "  The map of input topics to Schema class names (specified as a JSON object).  \n" +
-                            "- **customSerdeInputs**  \n" +
-                            "  The map of input topics to SerDe class names (specified as a JSON object).  \n" +
-                            "- **deadLetterTopic**  \n" +
-                            "  Messages that are not processed successfully are sent to `deadLetterTopic`.  \n" +
-                            "- **runtimeFlags**  \n" +
-                            "  Any flags that you want to pass to the runtime. Note that in thread mode, these flags have no impact.  \n" +
-                            "- **fqfn**  \n" +
-                            "  The Fully Qualified Function Name (FQFN) for the Pulsar Function.  \n" +
-                            "- **inputSpecs**  \n" +
-                            "   The map of input topics to its consumer configuration, each configuration has schema of " +
-                            "   {\"schemaType\": \"type-x\", \"serdeClassName\": \"name-x\", \"isRegexPattern\": true, \"receiverQueueSize\": 5}  \n" +
-                            "- **inputs**  \n" +
-                            "  The input topic or topics (multiple topics can be specified as a comma-separated list) of a Pulsar Function.  \n" +
-                            "- **jar**  \n" +
-                            "  Path to the JAR file for the Pulsar Function (if the Pulsar Function is written in Java). " +
-                            "  It also supports URL path [http/https/file (file protocol assumes that file " +
-                            "  already exists on worker host)] from which worker can download the package.  \n" +
-                            "- **py**  \n" +
-                            "  Path to the main Python file or Python wheel file for the Pulsar Function (if the Pulsar Function is written in Python).  \n" +
-                            "- **go**  \n" +
-                            "  Path to the main Go executable binary for the Pulsar Function (if the Pulsar Function is written in Go).  \n" +
-                            "- **logTopic**  \n" +
-                            "  The topic to which the logs of a Pulsar Function are produced.  \n" +
-                            "- **maxMessageRetries**  \n" +
-                            "  How many times should we try to process a message before giving up.  \n" +
-                            "- **output**  \n" +
-                            "  The output topic of a Pulsar Function (If none is specified, no output is written).  \n" +
-                            "- **outputSerdeClassName**  \n" +
-                            "  The SerDe class to be used for messages output by the Pulsar Function.  \n" +
-                            "- **parallelism**  \n" +
-                            "  The parallelism factor of a Pulsar Function (i.e. the number of a Pulsar Function instances to run).  \n" +
-                            "- **processingGuarantees**  \n" +
-                            "  The processing guarantees (that is, delivery semantics) applied to the Pulsar Function." +
-                            "  Possible Values: [ATLEAST_ONCE, ATMOST_ONCE, EFFECTIVELY_ONCE]  \n" +
-                            "- **retainOrdering**  \n" +
-                            "  Function consumes and processes messages in order.  \n" +
-                            "- **outputSchemaType**  \n" +
-                            "   Represents either a builtin schema type (for example: 'avro', 'json', ect) or the class name for a Schema implementation." +
-                            "- **subName**  \n" +
-                            "  Pulsar source subscription name. User can specify a subscription-name for the input-topic consumer.  \n" +
-                            "- **windowConfig**  \n" +
-                            "  The window configuration of a Pulsar Function.  \n" +
-                            "- **timeoutMs**  \n" +
-                            "  The message timeout in milliseconds.  \n" +
-                            "- **topicsPattern**  \n" +
-                            "  The topic pattern to consume from a list of topics under a namespace that match the pattern." +
-                            "  [input] and [topic-pattern] are mutually exclusive. Add SerDe class name for a " +
-                            "  pattern in customSerdeInputs (supported for java fun only)  \n" +
-                            "- **userConfig**  \n" +
-                            "  A map of user-defined configurations (specified as a JSON object).  \n" +
-                            "- **secrets**  \n" +
-                            "  This is a map of secretName(that is how the secret is going to be accessed in the Pulsar Function via context) to an object that" +
-                            "  encapsulates how the secret is fetched by the underlying secrets provider. The type of an value here can be found by the" +
-                            "  SecretProviderConfigurator.getSecretObjectType() method. \n" +
-                            "- **cleanupSubscription**  \n" +
-                            "  Whether the subscriptions of a Pulsar Function created or used should be deleted when the Pulsar Function is deleted.  \n",
+                    value = "A JSON value presenting configuration payload of a Pulsar Function."
+                            + " An example of the expected Pulsar Function can be found here.\n"
+                            + "- **autoAck**\n"
+                            + "  Whether or not the framework acknowledges messages automatically.\n"
+                            + "- **runtime**\n"
+                            + "  What is the runtime of the Pulsar Function. Possible Values: [JAVA, PYTHON, GO]\n"
+                            + "- **resources**\n"
+                            + "  The size of the system resources allowed by the Pulsar Function runtime."
+                            + " The resources include: cpu, ram, disk.\n"
+                            + "- **className**\n"
+                            + "  The class name of a Pulsar Function.\n"
+                            + "- **customSchemaInputs**\n"
+                            + "  The map of input topics to Schema class names (specified as a JSON object).\n"
+                            + "- **customSerdeInputs**\n"
+                            + "  The map of input topics to SerDe class names (specified as a JSON object).\n"
+                            + "- **deadLetterTopic**\n"
+                            + "  Messages that are not processed successfully are sent to `deadLetterTopic`.\n"
+                            + "- **runtimeFlags**\n"
+                            + "  Any flags that you want to pass to the runtime."
+                            + " Note that in thread mode, these flags have no impact.\n"
+                            + "- **fqfn**\n"
+                            + "  The Fully Qualified Function Name (FQFN) for the Pulsar Function.\n"
+                            + "- **inputSpecs**\n"
+                            + "   The map of input topics to its consumer configuration,"
+                            + " each configuration has schema of "
+                            + "   {\"schemaType\": \"type-x\", \"serdeClassName\": \"name-x\","
+                            + " \"isRegexPattern\": true, \"receiverQueueSize\": 5}\n"
+                            + "- **inputs**\n"
+                            + "  The input topic or topics (multiple topics can be specified as"
+                            + " a comma-separated list) of a Pulsar Function.\n"
+                            + "- **jar**\n"
+                            + "  Path to the JAR file for the Pulsar Function"
+                            + " (if the Pulsar Function is written in Java). "
+                            + "  It also supports URL path [http/https/file (file protocol assumes that file "
+                            + "  already exists on worker host)] from which worker can download the package.\n"
+                            + "- **py**\n"
+                            + "  Path to the main Python file or Python wheel file for the"
+                            + " Pulsar Function (if the Pulsar Function is written in Python).\n"
+                            + "- **go**\n"
+                            + "  Path to the main Go executable binary for the Pulsar Function"
+                            + " (if the Pulsar Function is written in Go).\n"
+                            + "- **logTopic**\n"
+                            + "  The topic to which the logs of a Pulsar Function are produced.\n"
+                            + "- **maxMessageRetries**\n"
+                            + "  How many times should we try to process a message before giving up.\n"
+                            + "- **output**\n"
+                            + "  The output topic of a Pulsar Function"
+                            + " (If none is specified, no output is written).\n"
+                            + "- **outputSerdeClassName**\n"
+                            + "  The SerDe class to be used for messages output by the Pulsar Function.\n"
+                            + "- **parallelism**\n"
+                            + "  The parallelism factor of a Pulsar Function"
+                            + " (i.e. the number of a Pulsar Function instances to run).\n"
+                            + "- **processingGuarantees**\n"
+                            + "  The processing guarantees (that is, delivery semantics)"
+                            + " applied to the Pulsar Function."
+                            + "  Possible Values: [ATLEAST_ONCE, ATMOST_ONCE, EFFECTIVELY_ONCE]\n"
+                            + "- **retainOrdering**\n"
+                            + "  Function consumes and processes messages in order.\n"
+                            + "- **outputSchemaType**\n"
+                            + "   Represents either a builtin schema type (for example: 'avro', 'json', ect)"
+                            + " or the class name for a Schema implementation."
+                            + "- **subName**\n"
+                            + "  Pulsar source subscription name. User can specify a subscription-name"
+                            + " for the input-topic consumer.\n"
+                            + "- **windowConfig**\n"
+                            + "  The window configuration of a Pulsar Function.\n"
+                            + "- **timeoutMs**\n"
+                            + "  The message timeout in milliseconds.\n"
+                            + "- **topicsPattern**\n"
+                            + "  The topic pattern to consume from a list of topics under a namespace"
+                            + " that match the pattern."
+                            + "  [input] and [topic-pattern] are mutually exclusive. Add SerDe class name for a "
+                            + "  pattern in customSerdeInputs (supported for java fun only)\n"
+                            + "- **userConfig**\n"
+                            + "  A map of user-defined configurations (specified as a JSON object).\n"
+                            + "- **secrets**\n"
+                            + "  This is a map of secretName(that is how the secret is going to be accessed"
+                            + " in the Pulsar Function via context) to an object that"
+                            + "  encapsulates how the secret is fetched by the underlying secrets provider."
+                            + " The type of an value here can be found by the"
+                            + "  SecretProviderConfigurator.getSecretObjectType() method. \n"
+                            + "- **cleanupSubscription**\n"
+                            + "  Whether the subscriptions of a Pulsar Function created or used should be deleted"
+                            + " when the Pulsar Function is deleted.\n",
                     examples = @Example(
                             value = @ExampleProperty(
                                     mediaType = MediaType.APPLICATION_JSON,
@@ -193,73 +211,90 @@ public class FunctionsBase extends AdminResource implements Supplier<WorkerServi
             final @FormDataParam("data") FormDataContentDisposition fileDetail,
             final @FormDataParam("url") String functionPkgUrl,
             @ApiParam(
-                    value = "A JSON value presenting configuration payload of a Pulsar Function. An example of the expected Pulsar Function can be found here.  \n" +
-                            "- **autoAck**  \n" +
-                            "  Whether or not the framework acknowledges messages automatically.  \n" +
-                            "- **runtime**  \n" +
-                            "  What is the runtime of the Pulsar Function. Possible Values: [JAVA, PYTHON, GO]  \n" +
-                            "- **resources**  \n" +
-                            "  The size of the system resources allowed by the Pulsar Function runtime. The resources include: cpu, ram, disk.  \n" +
-                            "- **className**  \n" +
-                            "  The class name of a Pulsar Function.  \n" +
-                            "- **customSchemaInputs**  \n" +
-                            "  The map of input topics to Schema class names (specified as a JSON object).  \n" +
-                            "- **customSerdeInputs**  \n" +
-                            "  The map of input topics to SerDe class names (specified as a JSON object).  \n" +
-                            "- **deadLetterTopic**  \n" +
-                            "  Messages that are not processed successfully are sent to `deadLetterTopic`.  \n" +
-                            "- **runtimeFlags**  \n" +
-                            "  Any flags that you want to pass to the runtime. Note that in thread mode, these flags have no impact.  \n" +
-                            "- **fqfn**  \n" +
-                            "  The Fully Qualified Function Name (FQFN) for the Pulsar Function.  \n" +
-                            "- **inputSpecs**  \n" +
-                            "   The map of input topics to its consumer configuration, each configuration has schema of " +
-                            "   {\"schemaType\": \"type-x\", \"serdeClassName\": \"name-x\", \"isRegexPattern\": true, \"receiverQueueSize\": 5}  \n" +
-                            "- **inputs**  \n" +
-                            "  The input topic or topics (multiple topics can be specified as a comma-separated list) of a Pulsar Function.  \n" +
-                            "- **jar**  \n" +
-                            "  Path to the JAR file for the Pulsar Function (if the Pulsar Function is written in Java). " +
-                            "  It also supports URL path [http/https/file (file protocol assumes that file " +
-                            "  already exists on worker host)] from which worker can download the package.  \n" +
-                            "- **py**  \n" +
-                            "  Path to the main Python file or Python wheel file for the Pulsar Function (if the Pulsar Function is written in Python).  \n" +
-                            "- **go**  \n" +
-                            "  Path to the main Go executable binary for the Pulsar Function (if the Pulsar Function is written in Go).  \n" +
-                            "- **logTopic**  \n" +
-                            "  The topic to which the logs of a Pulsar Function are produced.  \n" +
-                            "- **maxMessageRetries**  \n" +
-                            "  How many times should we try to process a message before giving up.  \n" +
-                            "- **output**  \n" +
-                            "  The output topic of a Pulsar Function (If none is specified, no output is written).  \n" +
-                            "- **outputSerdeClassName**  \n" +
-                            "  The SerDe class to be used for messages output by the Pulsar Function.  \n" +
-                            "- **parallelism**  \n" +
-                            "  The parallelism factor of a Pulsar Function (i.e. the number of a Pulsar Function instances to run).  \n" +
-                            "- **processingGuarantees**  \n" +
-                            "  The processing guarantees (that is, delivery semantics) applied to the Pulsar Function." +
-                            "  Possible Values: [ATLEAST_ONCE, ATMOST_ONCE, EFFECTIVELY_ONCE]  \n" +
-                            "- **retainOrdering**  \n" +
-                            "  Function consumes and processes messages in order.  \n" +
-                            "- **outputSchemaType**  \n" +
-                            "   Represents either a builtin schema type (for example: 'avro', 'json', ect) or the class name for a Schema implementation." +
-                            "- **subName**  \n" +
-                            "  Pulsar source subscription name. User can specify a subscription-name for the input-topic consumer.  \n" +
-                            "- **windowConfig**  \n" +
-                            "  The window configuration of a Pulsar Function.  \n" +
-                            "- **timeoutMs**  \n" +
-                            "  The message timeout in milliseconds.  \n" +
-                            "- **topicsPattern**  \n" +
-                            "  The topic pattern to consume from a list of topics under a namespace that match the pattern." +
-                            "  [input] and [topic-pattern] are mutually exclusive. Add SerDe class name for a " +
-                            "  pattern in customSerdeInputs (supported for java fun only)  \n" +
-                            "- **userConfig**  \n" +
-                            "  A map of user-defined configurations (specified as a JSON object).  \n" +
-                            "- **secrets**  \n" +
-                            "  This is a map of secretName(that is how the secret is going to be accessed in the Pulsar Function via context) to an object that" +
-                            "  encapsulates how the secret is fetched by the underlying secrets provider. The type of an value here can be found by the" +
-                            "  SecretProviderConfigurator.getSecretObjectType() method. \n" +
-                            "- **cleanupSubscription**  \n" +
-                            "  Whether the subscriptions of a Pulsar Function created or used should be deleted when the Pulsar Function is deleted.  \n",
+                    value = "A JSON value presenting configuration payload of a Pulsar Function."
+                            + " An example of the expected Pulsar Function can be found here.\n"
+                            + "- **autoAck**\n"
+                            + "  Whether or not the framework acknowledges messages automatically.\n"
+                            + "- **runtime**\n"
+                            + "  What is the runtime of the Pulsar Function. Possible Values: [JAVA, PYTHON, GO]\n"
+                            + "- **resources**\n"
+                            + "  The size of the system resources allowed by the Pulsar Function runtime."
+                            + " The resources include: cpu, ram, disk.\n"
+                            + "- **className**\n"
+                            + "  The class name of a Pulsar Function.\n"
+                            + "- **customSchemaInputs**\n"
+                            + "  The map of input topics to Schema class names (specified as a JSON object).\n"
+                            + "- **customSerdeInputs**\n"
+                            + "  The map of input topics to SerDe class names (specified as a JSON object).\n"
+                            + "- **deadLetterTopic**\n"
+                            + "  Messages that are not processed successfully are sent to `deadLetterTopic`.\n"
+                            + "- **runtimeFlags**\n"
+                            + "  Any flags that you want to pass to the runtime."
+                            + " Note that in thread mode, these flags have no impact.\n"
+                            + "- **fqfn**\n"
+                            + "  The Fully Qualified Function Name (FQFN) for the Pulsar Function.\n"
+                            + "- **inputSpecs**\n"
+                            + "   The map of input topics to its consumer configuration,"
+                            + " each configuration has schema of "
+                            + "   {\"schemaType\": \"type-x\", \"serdeClassName\": \"name-x\","
+                            + " \"isRegexPattern\": true, \"receiverQueueSize\": 5}\n"
+                            + "- **inputs**\n"
+                            + "  The input topic or topics (multiple topics can be specified as"
+                            + " a comma-separated list) of a Pulsar Function.\n"
+                            + "- **jar**\n"
+                            + "  Path to the JAR file for the Pulsar Function"
+                            + " (if the Pulsar Function is written in Java). "
+                            + "  It also supports URL path [http/https/file (file protocol assumes that file "
+                            + "  already exists on worker host)] from which worker can download the package.\n"
+                            + "- **py**\n"
+                            + "  Path to the main Python file or Python wheel file for the Pulsar Function"
+                            + " (if the Pulsar Function is written in Python).\n"
+                            + "- **go**\n"
+                            + "  Path to the main Go executable binary for the Pulsar Function"
+                            + " (if the Pulsar Function is written in Go).\n"
+                            + "- **logTopic**\n"
+                            + "  The topic to which the logs of a Pulsar Function are produced.\n"
+                            + "- **maxMessageRetries**\n"
+                            + "  How many times should we try to process a message before giving up.\n"
+                            + "- **output**\n"
+                            + "  The output topic of a Pulsar Function (If none is specified, no output is written).\n"
+                            + "- **outputSerdeClassName**\n"
+                            + "  The SerDe class to be used for messages output by the Pulsar Function.\n"
+                            + "- **parallelism**\n"
+                            + "  The parallelism factor of a Pulsar Function "
+                            + "(i.e. the number of a Pulsar Function instances to run).\n"
+                            + "- **processingGuarantees**\n"
+                            + "  The processing guarantees (that is, delivery semantics)"
+                            + " applied to the Pulsar Function."
+                            + "  Possible Values: [ATLEAST_ONCE, ATMOST_ONCE, EFFECTIVELY_ONCE]\n"
+                            + "- **retainOrdering**\n"
+                            + "  Function consumes and processes messages in order.\n"
+                            + "- **outputSchemaType**\n"
+                            + "   Represents either a builtin schema type (for example: 'avro', 'json', ect)"
+                            + " or the class name for a Schema implementation."
+                            + "- **subName**\n"
+                            + "  Pulsar source subscription name. User can specify"
+                            + " a subscription-name for the input-topic consumer.\n"
+                            + "- **windowConfig**\n"
+                            + "  The window configuration of a Pulsar Function.\n"
+                            + "- **timeoutMs**\n"
+                            + "  The message timeout in milliseconds.\n"
+                            + "- **topicsPattern**\n"
+                            + "  The topic pattern to consume from a list of topics"
+                            + " under a namespace that match the pattern."
+                            + "  [input] and [topic-pattern] are mutually exclusive. Add SerDe class name for a "
+                            + "  pattern in customSerdeInputs (supported for java fun only)\n"
+                            + "- **userConfig**\n"
+                            + "  A map of user-defined configurations (specified as a JSON object).\n"
+                            + "- **secrets**\n"
+                            + "  This is a map of secretName(that is how the secret is going to be accessed"
+                            + " in the Pulsar Function via context) to an object that"
+                            + "  encapsulates how the secret is fetched by the underlying secrets provider."
+                            + " The type of an value here can be found by the"
+                            + "  SecretProviderConfigurator.getSecretObjectType() method. \n"
+                            + "- **cleanupSubscription**\n"
+                            + "  Whether the subscriptions of a Pulsar Function created or used"
+                            + " should be deleted when the Pulsar Function is deleted.\n",
                     examples = @Example(
                             value = @ExampleProperty(
                                     mediaType = MediaType.APPLICATION_JSON,
@@ -339,15 +374,14 @@ public class FunctionsBase extends AdminResource implements Supplier<WorkerServi
     @Produces(MediaType.APPLICATION_JSON)
     @Path("/{tenant}/{namespace}/{functionName}/{instanceId}/status")
     public FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData getFunctionInstanceStatus(
-            @ApiParam(value = "The tenant of a Pulsar Function")
-            final @PathParam("tenant") String tenant,
-            @ApiParam(value = "The namespace of a Pulsar Function")
-            final @PathParam("namespace") String namespace,
-            @ApiParam(value = "The name of a Pulsar Function")
-            final @PathParam("functionName") String functionName,
-            @ApiParam(value = "The instanceId of a Pulsar Function (if instance-id is not provided, the stats of all instances is returned")
-            final @PathParam("instanceId") String instanceId) throws IOException {
-        return functions.getFunctionInstanceStatus(tenant, namespace, functionName, instanceId, uri.getRequestUri(), clientAppId(), clientAuthData());
+            @ApiParam(value = "The tenant of a Pulsar Function") final @PathParam("tenant") String tenant,
+            @ApiParam(value = "The namespace of a Pulsar Function") final @PathParam("namespace") String namespace,
+            @ApiParam(value = "The name of a Pulsar Function") final @PathParam("functionName") String functionName,
+            @ApiParam(value = "The instanceId of a Pulsar Function (if instance-id is not provided,"
+                    + " the stats of all instances is returned") final @PathParam("instanceId")
+                    String instanceId) throws IOException {
+        return functions.getFunctionInstanceStatus(tenant, namespace, functionName,
+                instanceId, uri.getRequestUri(), clientAppId(), clientAuthData());
     }
 
     @GET
@@ -370,7 +404,8 @@ public class FunctionsBase extends AdminResource implements Supplier<WorkerServi
             final @PathParam("namespace") String namespace,
             @ApiParam(value = "The name of a Pulsar Function")
             final @PathParam("functionName") String functionName) throws IOException {
-        return functions.getFunctionStatus(tenant, namespace, functionName, uri.getRequestUri(), clientAppId(), clientAuthData());
+        return functions.getFunctionStatus(tenant, namespace, functionName, uri.getRequestUri(),
+                clientAppId(), clientAuthData());
     }
 
     @GET
@@ -393,7 +428,8 @@ public class FunctionsBase extends AdminResource implements Supplier<WorkerServi
             final @PathParam("namespace") String namespace,
             @ApiParam(value = "The name of a Pulsar Function")
             final @PathParam("functionName") String functionName) throws IOException {
-        return functions.getFunctionStats(tenant, namespace, functionName, uri.getRequestUri(), clientAppId(), clientAuthData());
+        return functions.getFunctionStats(tenant, namespace, functionName,
+                uri.getRequestUri(), clientAppId(), clientAuthData());
     }
 
     @GET
@@ -410,15 +446,14 @@ public class FunctionsBase extends AdminResource implements Supplier<WorkerServi
     @Produces(MediaType.APPLICATION_JSON)
     @Path("/{tenant}/{namespace}/{functionName}/{instanceId}/stats")
     public FunctionStats.FunctionInstanceStats.FunctionInstanceStatsData getFunctionInstanceStats(
-            @ApiParam(value = "The tenant of a Pulsar Function")
-            final @PathParam("tenant") String tenant,
-            @ApiParam(value = "The namespace of a Pulsar Function")
-            final @PathParam("namespace") String namespace,
-            @ApiParam(value = "The name of a Pulsar Function")
-            final @PathParam("functionName") String functionName,
-            @ApiParam(value = "The instanceId of a Pulsar Function (if instance-id is not provided, the stats of all instances is returned")
-            final @PathParam("instanceId") String instanceId) throws IOException {
-        return functions.getFunctionsInstanceStats(tenant, namespace, functionName, instanceId, uri.getRequestUri(), clientAppId(), clientAuthData());
+            @ApiParam(value = "The tenant of a Pulsar Function") final @PathParam("tenant") String tenant,
+            @ApiParam(value = "The namespace of a Pulsar Function") final @PathParam("namespace") String namespace,
+            @ApiParam(value = "The name of a Pulsar Function") final @PathParam("functionName") String functionName,
+            @ApiParam(value = "The instanceId of a Pulsar Function"
+                    + " (if instance-id is not provided, the stats of all instances is returned") final @PathParam(
+                    "instanceId") String instanceId) throws IOException {
+        return functions.getFunctionsInstanceStats(tenant, namespace, functionName, instanceId,
+                uri.getRequestUri(), clientAppId(), clientAuthData());
     }
 
     @GET
@@ -454,19 +489,19 @@ public class FunctionsBase extends AdminResource implements Supplier<WorkerServi
     @Path("/{tenant}/{namespace}/{functionName}/trigger")
     @Consumes(MediaType.MULTIPART_FORM_DATA)
     public String triggerFunction(
-            @ApiParam(value = "The tenant of a Pulsar Function")
-            final @PathParam("tenant") String tenant,
-            @ApiParam(value = "The namespace of a Pulsar Function")
-            final @PathParam("namespace") String namespace,
-            @ApiParam(value = "The name of a Pulsar Function")
-            final @PathParam("functionName") String functionName,
-            @ApiParam(value = "The value with which you want to trigger the Pulsar Function")
-            final @FormDataParam("data") String triggerValue,
-            @ApiParam(value = "The path to the file that contains the data with which you'd like to trigger the Pulsar Function")
-            final @FormDataParam("dataStream") InputStream triggerStream,
-            @ApiParam(value = "The specific topic name that the Pulsar Function consumes from which you want to inject the data to")
-            final @FormDataParam("topic") String topic) {
-        return functions.triggerFunction(tenant, namespace, functionName, triggerValue, triggerStream, topic, clientAppId(), clientAuthData());
+            @ApiParam(value = "The tenant of a Pulsar Function") final @PathParam("tenant") String tenant,
+            @ApiParam(value = "The namespace of a Pulsar Function") final @PathParam("namespace") String namespace,
+            @ApiParam(value = "The name of a Pulsar Function") final @PathParam("functionName") String functionName,
+            @ApiParam(value = "The value with which you want to trigger the Pulsar Function") final @FormDataParam(
+                    "data") String triggerValue,
+            @ApiParam(value = "The path to the file that contains the data with"
+                    + " which you'd like to trigger the Pulsar Function") final @FormDataParam("dataStream")
+                    InputStream triggerStream,
+            @ApiParam(value = "The specific topic name that the Pulsar Function"
+                    + " consumes from which you want to inject the data to") final @FormDataParam("topic")
+                    String topic) {
+        return functions.triggerFunction(tenant, namespace, functionName, triggerValue,
+                triggerStream, topic, clientAppId(), clientAuthData());
     }
 
     @GET
@@ -524,15 +559,14 @@ public class FunctionsBase extends AdminResource implements Supplier<WorkerServi
     @Path("/{tenant}/{namespace}/{functionName}/{instanceId}/restart")
     @Consumes(MediaType.APPLICATION_JSON)
     public void restartFunction(
-            @ApiParam(value = "The tenant of a Pulsar Function")
-            final @PathParam("tenant") String tenant,
-            @ApiParam(value = "The namespace of a Pulsar Function")
-            final @PathParam("namespace") String namespace,
-            @ApiParam(value = "The name of a Pulsar Function")
-            final @PathParam("functionName") String functionName,
-            @ApiParam(value = "The instanceId of a Pulsar Function (if instance-id is not provided, all instances are restarted")
+            @ApiParam(value = "The tenant of a Pulsar Function") final @PathParam("tenant") String tenant,
+            @ApiParam(value = "The namespace of a Pulsar Function") final @PathParam("namespace") String namespace,
+            @ApiParam(value = "The name of a Pulsar Function") final @PathParam("functionName") String functionName,
+            @ApiParam(value =
+                    "The instanceId of a Pulsar Function (if instance-id is not provided, all instances are restarted")
             final @PathParam("instanceId") String instanceId) {
-        functions.restartFunctionInstance(tenant, namespace, functionName, instanceId, uri.getRequestUri(), clientAppId(), clientAuthData());
+        functions.restartFunctionInstance(tenant, namespace, functionName, instanceId,
+                uri.getRequestUri(), clientAppId(), clientAuthData());
     }
 
     @POST
@@ -564,15 +598,14 @@ public class FunctionsBase extends AdminResource implements Supplier<WorkerServi
     @Path("/{tenant}/{namespace}/{functionName}/{instanceId}/stop")
     @Consumes(MediaType.APPLICATION_JSON)
     public void stopFunction(
-            @ApiParam(value = "The tenant of a Pulsar Function")
-            final @PathParam("tenant") String tenant,
-            @ApiParam(value = "The namespace of a Pulsar Function")
-            final @PathParam("namespace") String namespace,
-            @ApiParam(value = "The name of a Pulsar Function")
-            final @PathParam("functionName") String functionName,
-            @ApiParam(value = "The instanceId of a Pulsar Function (if instance-id is not provided, all instances are stopped. ")
+            @ApiParam(value = "The tenant of a Pulsar Function") final @PathParam("tenant") String tenant,
+            @ApiParam(value = "The namespace of a Pulsar Function") final @PathParam("namespace") String namespace,
+            @ApiParam(value = "The name of a Pulsar Function") final @PathParam("functionName") String functionName,
+            @ApiParam(value =
+                    "The instanceId of a Pulsar Function (if instance-id is not provided, all instances are stopped. ")
             final @PathParam("instanceId") String instanceId) {
-        functions.stopFunctionInstance(tenant, namespace, functionName, instanceId, uri.getRequestUri(), clientAppId(), clientAuthData());
+        functions.stopFunctionInstance(tenant, namespace, functionName, instanceId,
+                uri.getRequestUri(), clientAppId(), clientAuthData());
     }
 
     @POST
@@ -604,15 +637,14 @@ public class FunctionsBase extends AdminResource implements Supplier<WorkerServi
     @Path("/{tenant}/{namespace}/{functionName}/{instanceId}/start")
     @Consumes(MediaType.APPLICATION_JSON)
     public void startFunction(
-            @ApiParam(value = "The tenant of a Pulsar Function")
-            final @PathParam("tenant") String tenant,
-            @ApiParam(value = "The namespace of a Pulsar Function")
-            final @PathParam("namespace") String namespace,
-            @ApiParam(value = "The name of a Pulsar Function")
-            final @PathParam("functionName") String functionName,
-            @ApiParam(value = "The instanceId of a Pulsar Function (if instance-id is not provided, all instances sre started. ")
-            final @PathParam("instanceId") String instanceId) {
-        functions.startFunctionInstance(tenant, namespace, functionName, instanceId, uri.getRequestUri(), clientAppId(), clientAuthData());
+            @ApiParam(value = "The tenant of a Pulsar Function") final @PathParam("tenant") String tenant,
+            @ApiParam(value = "The namespace of a Pulsar Function") final @PathParam("namespace") String namespace,
+            @ApiParam(value = "The name of a Pulsar Function") final @PathParam("functionName") String functionName,
+            @ApiParam(value = "The instanceId of a Pulsar Function"
+                    + " (if instance-id is not provided, all instances sre started. ") final @PathParam("instanceId")
+                    String instanceId) {
+        functions.startFunctionInstance(tenant, namespace, functionName, instanceId,
+                uri.getRequestUri(), clientAppId(), clientAuthData());
     }
 
     @POST
@@ -704,10 +736,11 @@ public class FunctionsBase extends AdminResource implements Supplier<WorkerServi
     @Path("/leader/{tenant}/{namespace}/{functionName}")
     @Consumes(MediaType.MULTIPART_FORM_DATA)
     public void updateFunctionOnWorkerLeader(final @PathParam("tenant") String tenant,
-                                                 final @PathParam("namespace") String namespace,
-                                                 final @PathParam("functionName") String functionName,
-                                                 final @FormDataParam("functionMetaData") InputStream uploadedInputStream,
-                                                 final @FormDataParam("delete") boolean delete) {
+                                             final @PathParam("namespace") String namespace,
+                                             final @PathParam("functionName") String functionName,
+                                             final @FormDataParam("functionMetaData")
+                                                         InputStream uploadedInputStream,
+                                             final @FormDataParam("delete") boolean delete) {
 
         functions.updateFunctionOnWorkerLeader(tenant, namespace, functionName, uploadedInputStream,
                 delete, uri.getRequestUri(), clientAppId());
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index a1f4821..3fbe44d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -526,8 +526,9 @@ public abstract class NamespacesBase extends AdminResource {
                     }
                     URI redirect = UriBuilder.fromUri(uri.getRequestUri()).host(replClusterUrl.getHost())
                             .port(replClusterUrl.getPort()).replaceQueryParam("authoritative", false).build();
-                    if(log.isDebugEnabled()) {
-                        log.debug("[{}] Redirecting the rest call to {}: cluster={}", clientAppId(), redirect, replCluster);
+                    if (log.isDebugEnabled()) {
+                        log.debug("[{}] Redirecting the rest call to {}: cluster={}",
+                                clientAppId(), redirect, replCluster);
                     }
                     throw new WebApplicationException(Response.temporaryRedirect(redirect).build());
                 }
@@ -597,9 +598,11 @@ public abstract class NamespacesBase extends AdminResource {
                                 "The replication cluster does not provide TLS encrypted service");
                     }
                     URI redirect = UriBuilder.fromUri(uri.getRequestUri()).host(replClusterUrl.getHost())
-                            .port(replClusterUrl.getPort()).replaceQueryParam("authoritative", false).build();
-                    if(log.isDebugEnabled()) {
-                        log.debug("[{}] Redirecting the rest call to {}: cluster={}", clientAppId(), redirect, replCluster);
+                            .port(replClusterUrl.getPort())
+                            .replaceQueryParam("authoritative", false).build();
+                    if (log.isDebugEnabled()) {
+                        log.debug("[{}] Redirecting the rest call to {}: cluster={}",
+                                clientAppId(), redirect, replCluster);
                     }
                     throw new WebApplicationException(Response.temporaryRedirect(redirect).build());
                 }
@@ -796,7 +799,8 @@ public abstract class NamespacesBase extends AdminResource {
             throw new RestException(Status.NOT_FOUND, "Namespace does not exist");
         } catch (KeeperException.BadVersionException e) {
             log.warn(
-                    "[{}] Failed to update the replication clusters on namespace {} expected policy node version={} : concurrent modification",
+                    "[{}] Failed to update the replication clusters on"
+                            + " namespace {} expected policy node version={} : concurrent modification",
                     clientAppId(), namespaceName, policiesNode.getValue().getVersion());
 
             throw new RestException(Status.CONFLICT, "Concurrent modification");
@@ -835,7 +839,8 @@ public abstract class NamespacesBase extends AdminResource {
             throw new RestException(Status.NOT_FOUND, "Namespace does not exist");
         } catch (KeeperException.BadVersionException e) {
             log.warn(
-                    "[{}] Failed to update the message TTL on namespace {} expected policy node version={} : concurrent modification",
+                    "[{}] Failed to update the message TTL"
+                            + " on namespace {} expected policy node version={} : concurrent modification",
                     clientAppId(), namespaceName, policiesNode.getValue().getVersion());
 
             throw new RestException(Status.CONFLICT, "Concurrent modification");
@@ -874,7 +879,8 @@ public abstract class NamespacesBase extends AdminResource {
             throw new RestException(Status.NOT_FOUND, "Namespace does not exist");
         } catch (KeeperException.BadVersionException e) {
             log.warn(
-                    "[{}] Failed to update the subscription expiration time on namespace {} expected policy node version={} : concurrent modification",
+                    "[{}] Failed to update the subscription expiration time on"
+                            + " namespace {} expected policy node version={} : concurrent modification",
                     clientAppId(), namespaceName, policiesNode.getValue().getVersion());
             throw new RestException(Status.CONFLICT, "Concurrent modification");
         } catch (Exception e) {
@@ -884,7 +890,8 @@ public abstract class NamespacesBase extends AdminResource {
         }
     }
 
-    protected void internalSetAutoTopicCreation(AsyncResponse asyncResponse, AutoTopicCreationOverride autoTopicCreationOverride) {
+    protected void internalSetAutoTopicCreation(AsyncResponse asyncResponse,
+                                                AutoTopicCreationOverride autoTopicCreationOverride) {
         final int maxPartitions = pulsar().getConfig().getMaxNumPartitionsPerPartitionedTopic();
         validateAdminAccessForTenant(namespaceName.getTenant());
         validatePoliciesReadOnlyAccess();
@@ -893,7 +900,8 @@ public abstract class NamespacesBase extends AdminResource {
             throw new RestException(Status.PRECONDITION_FAILED, "Invalid configuration for autoTopicCreationOverride");
         }
         if (maxPartitions > 0 && autoTopicCreationOverride.defaultNumPartitions > maxPartitions) {
-            throw new RestException(Status.NOT_ACCEPTABLE, "Number of partitions should be less than or equal to " + maxPartitions);
+            throw new RestException(Status.NOT_ACCEPTABLE,
+                    "Number of partitions should be less than or equal to " + maxPartitions);
         }
 
         // Force to read the data s.t. the watch to the cache content is setup.
@@ -905,12 +913,16 @@ public abstract class NamespacesBase extends AdminResource {
                         try {
                             // Write back the new policies into zookeeper
                             globalZk().setData(path(POLICIES, namespaceName.toString()),
-                                    jsonMapper().writeValueAsBytes(policiesNode.getKey()), policiesNode.getValue().getVersion(),
+                                    jsonMapper().writeValueAsBytes(
+                                            policiesNode.getKey()), policiesNode.getValue().getVersion(),
                                     (rc, path1, ctx, stat) -> {
                                         if (rc == KeeperException.Code.OK.intValue()) {
                                             policiesCache().invalidate(path(POLICIES, namespaceName.toString()));
-                                            String autoOverride = autoTopicCreationOverride.allowAutoTopicCreation ? "enabled" : "disabled";
-                                            log.info("[{}] Successfully {} autoTopicCreation on namespace {}", clientAppId(), autoOverride, namespaceName);
+                                            String autoOverride =
+                                                    autoTopicCreationOverride
+                                                            .allowAutoTopicCreation ? "enabled" : "disabled";
+                                            log.info("[{}] Successfully {} autoTopicCreation on namespace {}",
+                                                    clientAppId(), autoOverride, namespaceName);
                                             asyncResponse.resume(Response.noContent().build());
                                         } else {
                                             String errorMsg = String.format(
@@ -918,28 +930,35 @@ public abstract class NamespacesBase extends AdminResource {
                                                     clientAppId(), namespaceName);
                                             if (rc == KeeperException.Code.NONODE.intValue()) {
                                                 log.warn("{} : does not exist", errorMsg);
-                                                asyncResponse.resume(new RestException(Status.NOT_FOUND, "Namespace does not exist"));
+                                                asyncResponse.resume(new RestException(Status.NOT_FOUND,
+                                                        "Namespace does not exist"));
                                             } else if (rc == KeeperException.Code.BADVERSION.intValue()) {
                                                 log.warn("{} : concurrent modification", errorMsg);
-                                                asyncResponse.resume(new RestException(Status.CONFLICT, "Concurrent modification"));
+                                                asyncResponse.resume(new RestException(Status.CONFLICT,
+                                                        "Concurrent modification"));
                                             } else {
-                                                asyncResponse.resume(KeeperException.create(KeeperException.Code.get(rc), errorMsg));
+                                                asyncResponse.resume(
+                                                        KeeperException.create(
+                                                                KeeperException.Code.get(rc), errorMsg));
                                             }
                                         }
                                     }, null);
                             return null;
                         } catch (Exception e) {
-                            log.error("[{}] Failed to modify autoTopicCreation status on namespace {}", clientAppId(), namespaceName, e);
+                            log.error("[{}] Failed to modify autoTopicCreation status on namespace {}",
+                                    clientAppId(), namespaceName, e);
                             asyncResponse.resume(new RestException(e));
                             return null;
                         }
                     } else {
-                        asyncResponse.resume(new RestException(Status.NOT_FOUND, "Namespace " + namespaceName + " does not exist"));
+                        asyncResponse.resume(new RestException(Status.NOT_FOUND,
+                                "Namespace " + namespaceName + " does not exist"));
                         return null;
                     }
                 }
         ).exceptionally(e -> {
-            log.error("[{}] Failed to modify autoTopicCreation status on namespace {}", clientAppId(), namespaceName, e);
+            log.error("[{}] Failed to modify autoTopicCreation status on namespace {}",
+                    clientAppId(), namespaceName, e);
             asyncResponse.resume(new RestException(e));
             return null;
         });
@@ -958,11 +977,14 @@ public abstract class NamespacesBase extends AdminResource {
                         try {
                             // Write back the new policies into zookeeper
                             globalZk().setData(path(POLICIES, namespaceName.toString()),
-                                    jsonMapper().writeValueAsBytes(policiesNode.getKey()), policiesNode.getValue().getVersion(),
+                                    jsonMapper().writeValueAsBytes(policiesNode.getKey()),
+                                    policiesNode.getValue().getVersion(),
                                     (rc, path1, ctx, stat) -> {
                                         if (rc == KeeperException.Code.OK.intValue()) {
                                             policiesCache().invalidate(path(POLICIES, namespaceName.toString()));
-                                            log.info("[{}] Successfully removed autoTopicCreation override on namespace {}", clientAppId(), namespaceName);
+                                            log.info("[{}] Successfully removed autoTopicCreation override"
+                                                            + " on namespace {}",
+                                                    clientAppId(), namespaceName);
                                             asyncResponse.resume(Response.noContent().build());
                                         } else {
                                             String errorMsg = String.format(
@@ -970,34 +992,41 @@ public abstract class NamespacesBase extends AdminResource {
                                                     clientAppId(), namespaceName);
                                             if (rc == KeeperException.Code.NONODE.intValue()) {
                                                 log.warn("{} : does not exist", errorMsg);
-                                                asyncResponse.resume(new RestException(Status.NOT_FOUND, "Namespace does not exist"));
+                                                asyncResponse.resume(new RestException(Status.NOT_FOUND,
+                                                        "Namespace does not exist"));
                                             } else if (rc == KeeperException.Code.BADVERSION.intValue()) {
                                                 log.warn("{} : concurrent modification", errorMsg);
-                                                asyncResponse.resume(new RestException(Status.CONFLICT, "Concurrent modification"));
+                                                asyncResponse.resume(new RestException(Status.CONFLICT,
+                                                        "Concurrent modification"));
                                             } else {
-                                                asyncResponse.resume(KeeperException.create(KeeperException.Code.get(rc), errorMsg));
+                                                asyncResponse.resume(KeeperException.create(
+                                                        KeeperException.Code.get(rc), errorMsg));
                                             }
                                         }
                                     }, null);
                             return null;
                         } catch (Exception e) {
-                            log.error("[{}] Failed to modify autoTopicCreation status on namespace {}", clientAppId(), namespaceName, e);
+                            log.error("[{}] Failed to modify autoTopicCreation status on namespace {}",
+                                    clientAppId(), namespaceName, e);
                             asyncResponse.resume(new RestException(e));
                             return null;
                         }
                     } else {
-                        asyncResponse.resume(new RestException(Status.NOT_FOUND, "Namespace " + namespaceName + " does not exist"));
+                        asyncResponse.resume(new RestException(Status.NOT_FOUND,
+                                "Namespace " + namespaceName + " does not exist"));
                         return null;
                     }
                 }
         ).exceptionally(e -> {
-            log.error("[{}] Failed to modify autoTopicCreation status on namespace {}", clientAppId(), namespaceName, e);
+            log.error("[{}] Failed to modify autoTopicCreation status on namespace {}",
+                    clientAppId(), namespaceName, e);
             asyncResponse.resume(new RestException(e));
             return null;
         });
     }
 
-    protected void internalSetAutoSubscriptionCreation(AsyncResponse asyncResponse, AutoSubscriptionCreationOverride autoSubscriptionCreationOverride) {
+    protected void internalSetAutoSubscriptionCreation(
+            AsyncResponse asyncResponse, AutoSubscriptionCreationOverride autoSubscriptionCreationOverride) {
         validateAdminAccessForTenant(namespaceName.getTenant());
         validatePoliciesReadOnlyAccess();
 
@@ -1010,41 +1039,52 @@ public abstract class NamespacesBase extends AdminResource {
                         try {
                             // Write back the new policies into zookeeper
                             globalZk().setData(path(POLICIES, namespaceName.toString()),
-                                    jsonMapper().writeValueAsBytes(policiesNode.getKey()), policiesNode.getValue().getVersion(),
+                                    jsonMapper().writeValueAsBytes(policiesNode.getKey()),
+                                    policiesNode.getValue().getVersion(),
                                     (rc, path1, ctx, stat) -> {
                                         if (rc == KeeperException.Code.OK.intValue()) {
                                             policiesCache().invalidate(path(POLICIES, namespaceName.toString()));
-                                            String autoOverride = autoSubscriptionCreationOverride.allowAutoSubscriptionCreation ? "enabled" : "disabled";
-                                            log.info("[{}] Successfully {} autoSubscriptionCreation on namespace {}", clientAppId(), autoOverride, namespaceName);
+                                            String autoOverride =
+                                                    autoSubscriptionCreationOverride
+                                                            .allowAutoSubscriptionCreation ? "enabled" : "disabled";
+                                            log.info("[{}] Successfully {} autoSubscriptionCreation on namespace {}",
+                                                    clientAppId(), autoOverride, namespaceName);
                                             asyncResponse.resume(Response.noContent().build());
                                         } else {
                                             String errorMsg = String.format(
-                                                    "[%s] Failed to modify autoSubscriptionCreation status for namespace %s",
+                                                    "[%s] Failed to modify autoSubscriptionCreation"
+                                                            + " status for namespace %s",
                                                     clientAppId(), namespaceName);
                                             if (rc == KeeperException.Code.NONODE.intValue()) {
                                                 log.warn("{} : does not exist", errorMsg);
-                                                asyncResponse.resume(new RestException(Status.NOT_FOUND, "Namespace does not exist"));
+                                                asyncResponse.resume(new RestException(Status.NOT_FOUND,
+                                                        "Namespace does not exist"));
                                             } else if (rc == KeeperException.Code.BADVERSION.intValue()) {
                                                 log.warn("{} : concurrent modification", errorMsg);
-                                                asyncResponse.resume(new RestException(Status.CONFLICT, "Concurrent modification"));
+                                                asyncResponse.resume(new RestException(Status.CONFLICT,
+                                                        "Concurrent modification"));
                                             } else {
-                                                asyncResponse.resume(KeeperException.create(KeeperException.Code.get(rc), errorMsg));
+                                                asyncResponse.resume(KeeperException.create(
+                                                        KeeperException.Code.get(rc), errorMsg));
                                             }
                                         }
                                     }, null);
                             return null;
                         } catch (Exception e) {
-                            log.error("[{}] Failed to modify autoSubscriptionCreation status on namespace {}", clientAppId(), namespaceName, e);
+                            log.error("[{}] Failed to modify autoSubscriptionCreation status on namespace {}",
+                                    clientAppId(), namespaceName, e);
                             asyncResponse.resume(new RestException(e));
                             return null;
                         }
                     } else {
-                        asyncResponse.resume(new RestException(Status.NOT_FOUND, "Namespace " + namespaceName + " does not exist"));
+                        asyncResponse.resume(new RestException(Status.NOT_FOUND,
+                                "Namespace " + namespaceName + " does not exist"));
                         return null;
                     }
                 }
         ).exceptionally(e -> {
-            log.error("[{}] Failed to modify autoSubscriptionCreation status on namespace {}", clientAppId(), namespaceName, e);
+            log.error("[{}] Failed to modify autoSubscriptionCreation status on namespace {}",
+                    clientAppId(), namespaceName, e);
             asyncResponse.resume(new RestException(e));
             return null;
         });
@@ -1063,40 +1103,50 @@ public abstract class NamespacesBase extends AdminResource {
                         try {
                             // Write back the new policies into zookeeper
                             globalZk().setData(path(POLICIES, namespaceName.toString()),
-                                    jsonMapper().writeValueAsBytes(policiesNode.getKey()), policiesNode.getValue().getVersion(),
+                                    jsonMapper().writeValueAsBytes(
+                                            policiesNode.getKey()), policiesNode.getValue().getVersion(),
                                     (rc, path1, ctx, stat) -> {
                                         if (rc == KeeperException.Code.OK.intValue()) {
                                             policiesCache().invalidate(path(POLICIES, namespaceName.toString()));
-                                            log.info("[{}] Successfully removed autoSubscriptionCreation override on namespace {}", clientAppId(), namespaceName);
+                                            log.info("[{}] Successfully removed autoSubscriptionCreation"
+                                                            + " override on namespace {}",
+                                                    clientAppId(), namespaceName);
                                             asyncResponse.resume(Response.noContent().build());
                                         } else {
                                             String errorMsg = String.format(
-                                                    "[%s] Failed to modify autoSubscriptionCreation status for namespace %s",
+                                                    "[%s] Failed to modify autoSubscriptionCreation"
+                                                            + " status for namespace %s",
                                                     clientAppId(), namespaceName);
                                             if (rc == KeeperException.Code.NONODE.intValue()) {
                                                 log.warn("{} : does not exist", errorMsg);
-                                                asyncResponse.resume(new RestException(Status.NOT_FOUND, "Namespace does not exist"));
+                                                asyncResponse.resume(new RestException(Status.NOT_FOUND,
+                                                        "Namespace does not exist"));
                                             } else if (rc == KeeperException.Code.BADVERSION.intValue()) {
                                                 log.warn("{} : concurrent modification", errorMsg);
-                                                asyncResponse.resume(new RestException(Status.CONFLICT, "Concurrent modification"));
+                                                asyncResponse.resume(new RestException(Status.CONFLICT,
+                                                        "Concurrent modification"));
                                             } else {
-                                                asyncResponse.resume(KeeperException.create(KeeperException.Code.get(rc), errorMsg));
+                                                asyncResponse.resume(KeeperException.create(
+                                                        KeeperException.Code.get(rc), errorMsg));
                                             }
                                         }
                                     }, null);
                             return null;
                         } catch (Exception e) {
-                            log.error("[{}] Failed to modify autoSubscriptionCreation status on namespace {}", clientAppId(), namespaceName, e);
+                            log.error("[{}] Failed to modify autoSubscriptionCreation status on namespace {}",
+                                    clientAppId(), namespaceName, e);
                             asyncResponse.resume(new RestException(e));
                             return null;
                         }
                     } else {
-                        asyncResponse.resume(new RestException(Status.NOT_FOUND, "Namespace " + namespaceName + " does not exist"));
+                        asyncResponse.resume(new RestException(Status.NOT_FOUND,
+                                "Namespace " + namespaceName + " does not exist"));
                         return null;
                     }
                 }
         ).exceptionally(e -> {
-            log.error("[{}] Failed to modify autoSubscriptionCreation status on namespace {}", clientAppId(), namespaceName, e);
+            log.error("[{}] Failed to modify autoSubscriptionCreation status on namespace {}",
+                    clientAppId(), namespaceName, e);
             asyncResponse.resume(new RestException(e));
             return null;
         });
@@ -1127,12 +1177,14 @@ public abstract class NamespacesBase extends AdminResource {
             throw new RestException(Status.NOT_FOUND, "Namespace does not exist");
         } catch (KeeperException.BadVersionException e) {
             log.warn(
-                    "[{}] Failed to modify deduplication status on namespace {} expected policy node version={} : concurrent modification",
+                    "[{}] Failed to modify deduplication status on"
+                            + " namespace {} expected policy node version={} : concurrent modification",
                     clientAppId(), namespaceName, policiesNode.getValue().getVersion());
 
             throw new RestException(Status.CONFLICT, "Concurrent modification");
         } catch (Exception e) {
-            log.error("[{}] Failed to modify deduplication status on namespace {}", clientAppId(), namespaceName, e);
+            log.error("[{}] Failed to modify deduplication status on namespace {}",
+                    clientAppId(), namespaceName, e);
             throw new RestException(e);
         }
     }
@@ -1254,15 +1306,16 @@ public abstract class NamespacesBase extends AdminResource {
         String path = joinPath(LOCAL_POLICIES_ROOT, this.namespaceName.toString());
         try {
             Optional<LocalPolicies> policies = pulsar().getLocalZkCacheService().policiesCache().get(path);
-            final BookieAffinityGroupData bookkeeperAffinityGroup = policies.orElseThrow(() -> new RestException(Status.NOT_FOUND,
-                    "Namespace local-policies does not exist")).bookieAffinityGroup;
+            final BookieAffinityGroupData bookkeeperAffinityGroup =
+                    policies.orElseThrow(() -> new RestException(Status.NOT_FOUND,
+                            "Namespace local-policies does not exist")).bookieAffinityGroup;
             if (bookkeeperAffinityGroup == null) {
                 throw new RestException(Status.NOT_FOUND, "bookie-affinity group does not exist");
             }
             return bookkeeperAffinityGroup;
         } catch (KeeperException.NoNodeException e) {
-            log.warn("[{}] Failed to update local-policy configuration for namespace {}: does not exist", clientAppId(),
-                    namespaceName);
+            log.warn("[{}] Failed to update local-policy configuration for namespace {}: does not exist",
+                    clientAppId(), namespaceName);
             throw new RestException(Status.NOT_FOUND, "Namespace policies does not exist");
         } catch (RestException re) {
             throw re;
@@ -1281,13 +1334,16 @@ public abstract class NamespacesBase extends AdminResource {
 
         Policies policies = getNamespacePolicies(namespaceName);
 
-        NamespaceBundle bundle = pulsar().getNamespaceService().getNamespaceBundleFactory().getBundle(namespaceName.toString(), bundleRange);
+        NamespaceBundle bundle =
+                pulsar().getNamespaceService().getNamespaceBundleFactory()
+                        .getBundle(namespaceName.toString(), bundleRange);
         boolean isOwnedByLocalCluster = false;
         try {
             isOwnedByLocalCluster = pulsar().getNamespaceService().isNamespaceBundleOwned(bundle).get();
         } catch (Exception e) {
-            if(log.isDebugEnabled()) {
-                log.debug("Failed to validate cluster ownership for {}-{}, {}", namespaceName.toString(), bundleRange, e.getMessage(), e);
+            if (log.isDebugEnabled()) {
+                log.debug("Failed to validate cluster ownership for {}-{}, {}",
+                        namespaceName.toString(), bundleRange, e.getMessage(), e);
             }
         }
 
@@ -1328,7 +1384,8 @@ public abstract class NamespacesBase extends AdminResource {
     }
 
     @SuppressWarnings("deprecation")
-    protected void internalSplitNamespaceBundle(String bundleRange, boolean authoritative, boolean unload, String splitAlgorithmName) {
+    protected void internalSplitNamespaceBundle(String bundleRange,
+                                                boolean authoritative, boolean unload, String splitAlgorithmName) {
         validateSuperUserAccess();
         checkNotNull(bundleRange, "BundleRange should not be null");
         log.info("[{}] Split namespace bundle {}/{}", clientAppId(), namespaceName, bundleRange);
@@ -1347,14 +1404,18 @@ public abstract class NamespacesBase extends AdminResource {
         NamespaceBundle nsBundle = validateNamespaceBundleOwnership(namespaceName, policies.bundles, bundleRange,
                 authoritative, true);
 
-        List<String> supportedNamespaceBundleSplitAlgorithms = pulsar().getConfig().getSupportedNamespaceBundleSplitAlgorithms();
-        if (StringUtils.isNotBlank(splitAlgorithmName) && !supportedNamespaceBundleSplitAlgorithms.contains(splitAlgorithmName)) {
+        List<String> supportedNamespaceBundleSplitAlgorithms =
+                pulsar().getConfig().getSupportedNamespaceBundleSplitAlgorithms();
+        if (StringUtils.isNotBlank(splitAlgorithmName)
+                && !supportedNamespaceBundleSplitAlgorithms.contains(splitAlgorithmName)) {
             throw new RestException(Status.PRECONDITION_FAILED,
-                "Unsupported namespace bundle split algorithm, supported algorithms are " + supportedNamespaceBundleSplitAlgorithms);
+                    "Unsupported namespace bundle split algorithm, supported algorithms are "
+                            + supportedNamespaceBundleSplitAlgorithms);
         }
 
         try {
-            pulsar().getNamespaceService().splitAndOwnBundle(nsBundle, unload, getNamespaceBundleSplitAlgorithmByName(splitAlgorithmName)).get();
+            pulsar().getNamespaceService().splitAndOwnBundle(nsBundle, unload,
+                    getNamespaceBundleSplitAlgorithmByName(splitAlgorithmName)).get();
             log.info("[{}] Successfully split namespace bundle {}", clientAppId(), nsBundle.toString());
         } catch (ExecutionException e) {
             if (e.getCause() instanceof IllegalArgumentException) {
@@ -1374,7 +1435,8 @@ public abstract class NamespacesBase extends AdminResource {
     private NamespaceBundleSplitAlgorithm getNamespaceBundleSplitAlgorithmByName(String algorithmName) {
         NamespaceBundleSplitAlgorithm algorithm = NamespaceBundleSplitAlgorithm.of(algorithmName);
         if (algorithm == null) {
-            algorithm = NamespaceBundleSplitAlgorithm.of(pulsar().getConfig().getDefaultNamespaceBundleSplitAlgorithm());
+            algorithm = NamespaceBundleSplitAlgorithm.of(
+                    pulsar().getConfig().getDefaultNamespaceBundleSplitAlgorithm());
         }
         if (algorithm == null) {
             algorithm = NamespaceBundleSplitAlgorithm.RANGE_EQUALLY_DIVIDE_ALGO;
@@ -1393,14 +1455,16 @@ public abstract class NamespacesBase extends AdminResource {
             // Force to read the data s.t. the watch to the cache content is setup.
             policiesNode = policiesCache().getWithStat(path).orElseThrow(
                     () -> new RestException(Status.NOT_FOUND, "Namespace " + namespaceName + " does not exist"));
-            policiesNode.getKey().publishMaxMessageRate.put(pulsar().getConfiguration().getClusterName(), maxPublishMessageRate);
+            policiesNode.getKey().publishMaxMessageRate.put(
+                    pulsar().getConfiguration().getClusterName(), maxPublishMessageRate);
 
             // Write back the new policies into zookeeper
             globalZk().setData(path, jsonMapper().writeValueAsBytes(policiesNode.getKey()),
                     policiesNode.getValue().getVersion());
             policiesCache().invalidate(path);
 
-            log.info("[{}] Successfully updated the publish_max_message_rate for cluster on namespace {}", clientAppId(),
+            log.info("[{}] Successfully updated the publish_max_message_rate for cluster on namespace {}",
+                    clientAppId(),
                     namespaceName);
         } catch (KeeperException.NoNodeException e) {
             log.warn("[{}] Failed to update the publish_max_message_rate for cluster on namespace {}: does not exist",
@@ -1408,7 +1472,8 @@ public abstract class NamespacesBase extends AdminResource {
             throw new RestException(Status.NOT_FOUND, "Namespace does not exist");
         } catch (KeeperException.BadVersionException e) {
             log.warn(
-                    "[{}] Failed to update the publish_max_message_rate for cluster on namespace {} expected policy node version={} : concurrent modification",
+                    "[{}] Failed to update the publish_max_message_rate for cluster on namespace {}"
+                            + " expected policy node version={} : concurrent modification",
                     clientAppId(), namespaceName, policiesNode.getValue().getVersion());
 
             throw new RestException(Status.CONFLICT, "Concurrent modification");
@@ -1443,7 +1508,8 @@ public abstract class NamespacesBase extends AdminResource {
             throw new RestException(Status.NOT_FOUND, "Namespace does not exist");
         } catch (KeeperException.BadVersionException e) {
             log.warn(
-                    "[{}] Failed to remove the publish_max_message_rate for cluster on namespace {} expected policy node version={} : concurrent modification",
+                    "[{}] Failed to remove the publish_max_message_rate for cluster on"
+                            + " namespace {} expected policy node version={} : concurrent modification",
                     clientAppId(), namespaceName, policiesNode.getValue().getVersion());
 
             throw new RestException(Status.CONFLICT, "Concurrent modification");
@@ -1495,7 +1561,8 @@ public abstract class NamespacesBase extends AdminResource {
             throw new RestException(Status.NOT_FOUND, "Namespace does not exist");
         } catch (KeeperException.BadVersionException e) {
             log.warn(
-                    "[{}] Failed to update the dispatchRate for cluster on namespace {} expected policy node version={} : concurrent modification",
+                    "[{}] Failed to update the dispatchRate for cluster on"
+                            + " namespace {} expected policy node version={} : concurrent modification",
                     clientAppId(), namespaceName, policiesNode.getValue().getVersion());
 
             throw new RestException(Status.CONFLICT, "Concurrent modification");
@@ -1533,24 +1600,26 @@ public abstract class NamespacesBase extends AdminResource {
             final String path = path(POLICIES, namespaceName.toString());
             // Force to read the data s.t. the watch to the cache content is setup.
             policiesNode = policiesCache().getWithStat(path).orElseThrow(
-                () -> new RestException(Status.NOT_FOUND, "Namespace " + namespaceName + " does not exist"));
-            policiesNode.getKey().subscriptionDispatchRate.put(pulsar().getConfiguration().getClusterName(), dispatchRate);
+                    () -> new RestException(Status.NOT_FOUND, "Namespace " + namespaceName + " does not exist"));
+            policiesNode.getKey().subscriptionDispatchRate
+                    .put(pulsar().getConfiguration().getClusterName(), dispatchRate);
 
             // Write back the new policies into zookeeper
             globalZk().setData(path, jsonMapper().writeValueAsBytes(policiesNode.getKey()),
-                policiesNode.getValue().getVersion());
+                    policiesNode.getValue().getVersion());
             policiesCache().invalidate(path);
 
-            log.info("[{}] Successfully updated the subscriptionDispatchRate for cluster on namespace {}", clientAppId(),
-                namespaceName);
+            log.info("[{}] Successfully updated the subscriptionDispatchRate for cluster on namespace {}",
+                    clientAppId(), namespaceName);
         } catch (KeeperException.NoNodeException e) {
             log.warn("[{}] Failed to update the subscriptionDispatchRate for cluster on namespace {}: does not exist",
                 clientAppId(), namespaceName);
             throw new RestException(Status.NOT_FOUND, "Namespace does not exist");
         } catch (KeeperException.BadVersionException e) {
             log.warn(
-                "[{}] Failed to update the subscriptionDispatchRate for cluster on namespace {} expected policy node version={} : concurrent modification",
-                clientAppId(), namespaceName, policiesNode.getValue().getVersion());
+                    "[{}] Failed to update the subscriptionDispatchRate for cluster on"
+                            + " namespace {} expected policy node version={} : concurrent modification",
+                    clientAppId(), namespaceName, policiesNode.getValue().getVersion());
 
             throw new RestException(Status.CONFLICT, "Concurrent modification");
         } catch (Exception e) {
@@ -1564,12 +1633,14 @@ public abstract class NamespacesBase extends AdminResource {
         validateNamespacePolicyOperation(namespaceName, PolicyName.RATE, PolicyOperation.READ);
 
         Policies policies = getNamespacePolicies(namespaceName);
-        DispatchRate dispatchRate = policies.subscriptionDispatchRate.get(pulsar().getConfiguration().getClusterName());
+        DispatchRate dispatchRate =
+                policies.subscriptionDispatchRate.get(pulsar().getConfiguration().getClusterName());
         if (dispatchRate != null) {
             return dispatchRate;
         } else {
             throw new RestException(Status.NOT_FOUND,
-                "Subscription-Dispatch-rate is not configured for cluster " + pulsar().getConfiguration().getClusterName());
+                    "Subscription-Dispatch-rate is not configured for cluster "
+                            + pulsar().getConfiguration().getClusterName());
         }
     }
 
@@ -1585,7 +1656,8 @@ public abstract class NamespacesBase extends AdminResource {
             // Force to read the data s.t. the watch to the cache content is setup.
             policiesNode = policiesCache().getWithStat(path).orElseThrow(
                     () -> new RestException(Status.NOT_FOUND, "Namespace " + namespaceName + " does not exist"));
-            policiesNode.getKey().clusterSubscribeRate.put(pulsar().getConfiguration().getClusterName(), subscribeRate);
+            policiesNode.getKey().clusterSubscribeRate.put(
+                    pulsar().getConfiguration().getClusterName(), subscribeRate);
 
             // Write back the new policies into zookeeper
             globalZk().setData(path, jsonMapper().writeValueAsBytes(policiesNode.getKey()),
@@ -1600,7 +1672,8 @@ public abstract class NamespacesBase extends AdminResource {
             throw new RestException(Status.NOT_FOUND, "Namespace does not exist");
         } catch (KeeperException.BadVersionException e) {
             log.warn(
-                    "[{}] Failed to update the subscribeRate for cluster on namespace {} expected policy node version={} : concurrent modification",
+                    "[{}] Failed to update the subscribeRate for cluster on"
+                            + " namespace {} expected policy node version={} : concurrent modification",
                     clientAppId(), namespaceName, policiesNode.getValue().getVersion());
 
             throw new RestException(Status.CONFLICT, "Concurrent modification");
@@ -1633,24 +1706,26 @@ public abstract class NamespacesBase extends AdminResource {
             final String path = path(POLICIES, namespaceName.toString());
             // Force to read the data s.t. the watch to the cache content is setup.
             policiesNode = policiesCache().getWithStat(path).orElseThrow(
-                () -> new RestException(Status.NOT_FOUND, "Namespace " + namespaceName + " does not exist"));
-            policiesNode.getKey().replicatorDispatchRate.put(pulsar().getConfiguration().getClusterName(), dispatchRate);
+                    () -> new RestException(Status.NOT_FOUND, "Namespace " + namespaceName + " does not exist"));
+            policiesNode.getKey().replicatorDispatchRate.put(
+                    pulsar().getConfiguration().getClusterName(), dispatchRate);
 
             // Write back the new policies into zookeeper
             globalZk().setData(path, jsonMapper().writeValueAsBytes(policiesNode.getKey()),
-                policiesNode.getValue().getVersion());
+                    policiesNode.getValue().getVersion());
             policiesCache().invalidate(path);
 
             log.info("[{}] Successfully updated the replicatorDispatchRate for cluster on namespace {}", clientAppId(),
-                namespaceName);
+                    namespaceName);
         } catch (KeeperException.NoNodeException e) {
             log.warn("[{}] Failed to update the replicatorDispatchRate for cluster on namespace {}: does not exist",
                 clientAppId(), namespaceName);
             throw new RestException(Status.NOT_FOUND, "Namespace does not exist");
         } catch (KeeperException.BadVersionException e) {
             log.warn(
-                "[{}] Failed to update the replicatorDispatchRate for cluster on namespace {} expected policy node version={} : concurrent modification",
-                clientAppId(), namespaceName, policiesNode.getValue().getVersion());
+                    "[{}] Failed to update the replicatorDispatchRate for cluster on"
+                            + " namespace {} expected policy node version={} : concurrent modification",
+                    clientAppId(), namespaceName, policiesNode.getValue().getVersion());
 
             throw new RestException(Status.CONFLICT, "Concurrent modification");
         } catch (Exception e) {
@@ -1669,7 +1744,8 @@ public abstract class NamespacesBase extends AdminResource {
             return dispatchRate;
         } else {
             throw new RestException(Status.NOT_FOUND,
-                "replicator-Dispatch-rate is not configured for cluster " + pulsar().getConfiguration().getClusterName());
+                    "replicator-Dispatch-rate is not configured for cluster "
+                            + pulsar().getConfiguration().getClusterName());
         }
     }
 
@@ -1692,10 +1768,12 @@ public abstract class NamespacesBase extends AdminResource {
                 p.backlog_quota_map.put(backlogQuotaType, backlogQuota);
                 if (!checkQuotas(p, r)) {
                     log.warn(
-                            "[{}] Failed to update backlog configuration for namespace {}: conflicts with retention quota",
+                            "[{}] Failed to update backlog configuration"
+                                    + " for namespace {}: conflicts with retention quota",
                             clientAppId(), namespaceName);
                     throw new RestException(Status.PRECONDITION_FAILED,
-                            "Backlog Quota exceeds configured retention quota for namespace. Please increase retention quota and retry");
+                            "Backlog Quota exceeds configured retention quota for namespace."
+                                    + " Please increase retention quota and retry");
                 }
             }
             policies.backlog_quota_map.put(backlogQuotaType, backlogQuota);
@@ -1709,8 +1787,8 @@ public abstract class NamespacesBase extends AdminResource {
                     namespaceName);
             throw new RestException(Status.NOT_FOUND, "Namespace does not exist");
         } catch (KeeperException.BadVersionException e) {
-            log.warn("[{}] Failed to update backlog quota map for namespace {}: concurrent modification", clientAppId(),
-                    namespaceName);
+            log.warn("[{}] Failed to update backlog quota map for namespace {}: concurrent modification",
+                    clientAppId(), namespaceName);
             throw new RestException(Status.CONFLICT, "Concurrent modification");
         } catch (RestException pfe) {
             throw pfe;
@@ -1744,8 +1822,8 @@ public abstract class NamespacesBase extends AdminResource {
                     namespaceName);
             throw new RestException(Status.NOT_FOUND, "Namespace does not exist");
         } catch (KeeperException.BadVersionException e) {
-            log.warn("[{}] Failed to update backlog quota map for namespace {}: concurrent modification", clientAppId(),
-                    namespaceName);
+            log.warn("[{}] Failed to update backlog quota map for namespace {}: concurrent modification",
+                    clientAppId(), namespaceName);
             throw new RestException(Status.CONFLICT, "Concurrent modification");
         } catch (Exception e) {
             log.error("[{}] Failed to update backlog quota map for namespace {}", clientAppId(), namespaceName, e);
@@ -1764,7 +1842,8 @@ public abstract class NamespacesBase extends AdminResource {
             byte[] content = globalZk().getData(path, null, nodeStat);
             Policies policies = jsonMapper().readValue(content, Policies.class);
             if (!checkQuotas(policies, retention)) {
-                log.warn("[{}] Failed to update retention configuration for namespace {}: conflicts with backlog quota",
+                log.warn("[{}] Failed to update retention configuration"
+                                + " for namespace {}: conflicts with backlog quota",
                         clientAppId(), namespaceName);
                 throw new RestException(Status.PRECONDITION_FAILED,
                         "Retention Quota must exceed configured backlog quota for namespace.");
@@ -1776,8 +1855,8 @@ public abstract class NamespacesBase extends AdminResource {
                     namespaceName, jsonMapper().writeValueAsString(policies.retention_policies));
 
         } catch (KeeperException.NoNodeException e) {
-            log.warn("[{}] Failed to update retention configuration for namespace {}: does not exist", clientAppId(),
-                    namespaceName);
+            log.warn("[{}] Failed to update retention configuration for namespace {}: does not exist",
+                    clientAppId(), namespaceName);
             throw new RestException(Status.NOT_FOUND, "Namespace does not exist");
         } catch (KeeperException.BadVersionException e) {
             log.warn("[{}] Failed to update retention configuration for namespace {}: concurrent modification",
@@ -2092,13 +2171,14 @@ public abstract class NamespacesBase extends AdminResource {
             throw new RestException(Status.NOT_FOUND, "Namespace does not exist");
         } catch (KeeperException.BadVersionException e) {
             log.warn(
-                    "[{}] Failed to modify encryption required status on namespace {} expected policy node version={} : concurrent modification",
+                    "[{}] Failed to modify encryption required status on"
+                            + " namespace {} expected policy node version={} : concurrent modification",
                     clientAppId(), namespaceName, policiesNode.getValue().getVersion());
 
             throw new RestException(Status.CONFLICT, "Concurrent modification");
         } catch (Exception e) {
-            log.error("[{}] Failed to modify encryption required status on namespace {}", clientAppId(), namespaceName,
-                    e);
+            log.error("[{}] Failed to modify encryption required status on namespace {}", clientAppId(),
+                    namespaceName, e);
             throw new RestException(e);
         }
     }
@@ -2333,13 +2413,15 @@ public abstract class NamespacesBase extends AdminResource {
                     subscription = PersistentReplicator.getRemoteCluster(subscription);
                 }
                 for (Topic topic : topicList) {
-                    if (topic instanceof PersistentTopic && !SystemTopicClient.isSystemTopic(TopicName.get(topic.getName()))) {
+                    if (topic instanceof PersistentTopic
+                            && !SystemTopicClient.isSystemTopic(TopicName.get(topic.getName()))) {
                         futures.add(((PersistentTopic) topic).clearBacklog(subscription));
                     }
                 }
             } else {
                 for (Topic topic : topicList) {
-                    if (topic instanceof PersistentTopic && !SystemTopicClient.isSystemTopic(TopicName.get(topic.getName()))) {
+                    if (topic instanceof PersistentTopic
+                            && !SystemTopicClient.isSystemTopic(TopicName.get(topic.getName()))) {
                         futures.add(((PersistentTopic) topic).clearBacklog());
                     }
                 }
@@ -2383,12 +2465,10 @@ public abstract class NamespacesBase extends AdminResource {
     }
 
     /**
-     * It validates that peer-clusters can't coexist in replication-clusters
+     * It validates that peer-clusters can't coexist in replication-clusters.
      *
-     * @param clusterName:
-     *            given cluster whose peer-clusters can't be present into replication-cluster list
-     * @param replicationClusters:
-     *            replication-cluster list
+     * @param clusterName:         given cluster whose peer-clusters can't be present into replication-cluster list
+     * @param replicationClusters: replication-cluster list
      */
     private void validatePeerClusterConflict(String clusterName, Set<String> replicationClusters) {
         try {
@@ -2493,11 +2573,11 @@ public abstract class NamespacesBase extends AdminResource {
                 "Invalid retention policy: size limit must be >= -1");
         checkArgument(retention.getRetentionTimeInMinutes() >= -1,
                 "Invalid retention policy: time limit must be >= -1");
-        checkArgument((retention.getRetentionTimeInMinutes() != 0 && retention.getRetentionSizeInMB() != 0) ||
-                        (retention.getRetentionTimeInMinutes() == 0 && retention.getRetentionSizeInMB() == 0),
-                "Invalid retention policy: Setting a single time or size limit to 0 is invalid when " +
-                        "one of the limits has a non-zero value. Use the value of -1 instead of 0 to ignore a " +
-                        "specific limit. To disable retention both limits must be set to 0.");
+        checkArgument((retention.getRetentionTimeInMinutes() != 0 && retention.getRetentionSizeInMB() != 0)
+                        || (retention.getRetentionTimeInMinutes() == 0 && retention.getRetentionSizeInMB() == 0),
+                "Invalid retention policy: Setting a single time or size limit to 0 is invalid when "
+                        + "one of the limits has a non-zero value. Use the value of -1 instead of 0 to ignore a "
+                        + "specific limit. To disable retention both limits must be set to 0.");
     }
 
     protected int internalGetMaxProducersPerTopic() {
@@ -2534,22 +2614,23 @@ public abstract class NamespacesBase extends AdminResource {
             policies.max_producers_per_topic = maxProducersPerTopic;
             globalZk().setData(path, jsonMapper().writeValueAsBytes(policies), nodeStat.getVersion());
             policiesCache().invalidate(path(POLICIES, namespaceName.toString()));
-            log.info("[{}] Successfully updated maxProducersPerTopic configuration: namespace={}, value={}", clientAppId(),
-                    namespaceName, policies.max_producers_per_topic);
+            log.info("[{}] Successfully updated maxProducersPerTopic configuration: namespace={}, value={}",
+                    clientAppId(), namespaceName, policies.max_producers_per_topic);
 
         } catch (KeeperException.NoNodeException e) {
-            log.warn("[{}] Failed to update maxProducersPerTopic configuration for namespace {}: does not exist", clientAppId(),
-                    namespaceName);
+            log.warn("[{}] Failed to update maxProducersPerTopic configuration for namespace {}: does not exist",
+                    clientAppId(), namespaceName);
             throw new RestException(Status.NOT_FOUND, "Namespace does not exist");
         } catch (KeeperException.BadVersionException e) {
-            log.warn("[{}] Failed to update maxProducersPerTopic configuration for namespace {}: concurrent modification",
+            log.warn("[{}] Failed to update maxProducersPerTopic configuration for"
+                            + " namespace {}: concurrent modification",
                     clientAppId(), namespaceName);
             throw new RestException(Status.CONFLICT, "Concurrent modification");
         } catch (RestException pfe) {
             throw pfe;
         } catch (Exception e) {
-            log.error("[{}] Failed to update maxProducersPerTopic configuration for namespace {}", clientAppId(), namespaceName,
-                    e);
+            log.error("[{}] Failed to update maxProducersPerTopic configuration for namespace {}",
+                    clientAppId(), namespaceName, e);
             throw new RestException(e);
         }
     }
@@ -2575,22 +2656,23 @@ public abstract class NamespacesBase extends AdminResource {
             policies.max_consumers_per_topic = maxConsumersPerTopic;
             globalZk().setData(path, jsonMapper().writeValueAsBytes(policies), nodeStat.getVersion());
             policiesCache().invalidate(path(POLICIES, namespaceName.toString()));
-            log.info("[{}] Successfully updated maxConsumersPerTopic configuration: namespace={}, value={}", clientAppId(),
-                    namespaceName, policies.max_consumers_per_topic);
+            log.info("[{}] Successfully updated maxConsumersPerTopic configuration: namespace={}, value={}",
+                    clientAppId(), namespaceName, policies.max_consumers_per_topic);
 
         } catch (KeeperException.NoNodeException e) {
-            log.warn("[{}] Failed to update maxConsumersPerTopic configuration for namespace {}: does not exist", clientAppId(),
-                    namespaceName);
+            log.warn("[{}] Failed to update maxConsumersPerTopic configuration for namespace {}: does not exist",
+                    clientAppId(), namespaceName);
             throw new RestException(Status.NOT_FOUND, "Namespace does not exist");
         } catch (KeeperException.BadVersionException e) {
-            log.warn("[{}] Failed to update maxConsumersPerTopic configuration for namespace {}: concurrent modification",
+            log.warn("[{}] Failed to update maxConsumersPerTopic configuration for"
+                            + " namespace {}: concurrent modification",
                     clientAppId(), namespaceName);
             throw new RestException(Status.CONFLICT, "Concurrent modification");
         } catch (RestException pfe) {
             throw pfe;
         } catch (Exception e) {
-            log.error("[{}] Failed to update maxConsumersPerTopic configuration for namespace {}", clientAppId(), namespaceName,
-                    e);
+            log.error("[{}] Failed to update maxConsumersPerTopic configuration for namespace {}",
+                    clientAppId(), namespaceName, e);
             throw new RestException(e);
         }
     }
@@ -2616,22 +2698,23 @@ public abstract class NamespacesBase extends AdminResource {
             policies.max_consumers_per_subscription = maxConsumersPerSubscription;
             globalZk().setData(path, jsonMapper().writeValueAsBytes(policies), nodeStat.getVersion());
             policiesCache().invalidate(path(POLICIES, namespaceName.toString()));
-            log.info("[{}] Successfully updated maxConsumersPerSubscription configuration: namespace={}, value={}", clientAppId(),
-                    namespaceName, policies.max_consumers_per_subscription);
+            log.info("[{}] Successfully updated maxConsumersPerSubscription configuration: namespace={}, value={}",
+                    clientAppId(), namespaceName, policies.max_consumers_per_subscription);
 
         } catch (KeeperException.NoNodeException e) {
-            log.warn("[{}] Failed to update maxConsumersPerSubscription configuration for namespace {}: does not exist", clientAppId(),
-                    namespaceName);
+            log.warn("[{}] Failed to update maxConsumersPerSubscription configuration for namespace {}: does not exist",
+                    clientAppId(), namespaceName);
             throw new RestException(Status.NOT_FOUND, "Namespace does not exist");
         } catch (KeeperException.BadVersionException e) {
-            log.warn("[{}] Failed to update maxConsumersPerSubscription configuration for namespace {}: concurrent modification",
+            log.warn("[{}] Failed to update maxConsumersPerSubscription configuration for"
+                            + " namespace {}: concurrent modification",
                     clientAppId(), namespaceName);
             throw new RestException(Status.CONFLICT, "Concurrent modification");
         } catch (RestException pfe) {
             throw pfe;
         } catch (Exception e) {
-            log.error("[{}] Failed to update maxConsumersPerSubscription configuration for namespace {}", clientAppId(), namespaceName,
-                    e);
+            log.error("[{}] Failed to update maxConsumersPerSubscription configuration for namespace {}",
+                    clientAppId(), namespaceName, e);
             throw new RestException(e);
         }
     }
@@ -2657,22 +2740,24 @@ public abstract class NamespacesBase extends AdminResource {
             policies.max_unacked_messages_per_consumer = maxUnackedMessagesPerConsumer;
             globalZk().setData(path, jsonMapper().writeValueAsBytes(policies), nodeStat.getVersion());
             policiesCache().invalidate(path(POLICIES, namespaceName.toString()));
-            log.info("[{}] Successfully updated maxUnackedMessagesPerConsumer configuration: namespace={}, value={}", clientAppId(),
-                    namespaceName, policies.max_unacked_messages_per_consumer);
+            log.info("[{}] Successfully updated maxUnackedMessagesPerConsumer configuration: namespace={}, value={}",
+                    clientAppId(), namespaceName, policies.max_unacked_messages_per_consumer);
 
         } catch (KeeperException.NoNodeException e) {
-            log.warn("[{}] Failed to update maxUnackedMessagesPerConsumer configuration for namespace {}: does not exist", clientAppId(),
-                    namespaceName);
+            log.warn("[{}] Failed to update maxUnackedMessagesPerConsumer configuration"
+                            + " for namespace {}: does not exist",
+                    clientAppId(), namespaceName);
             throw new RestException(Status.NOT_FOUND, "Namespace does not exist");
         } catch (KeeperException.BadVersionException e) {
-            log.warn("[{}] Failed to update maxUnackedMessagesPerConsumer configuration for namespace {}: concurrent modification",
+            log.warn("[{}] Failed to update maxUnackedMessagesPerConsumer configuration"
+                            + " for namespace {}: concurrent modification",
                     clientAppId(), namespaceName);
             throw new RestException(Status.CONFLICT, "Concurrent modification");
         } catch (RestException pfe) {
             throw pfe;
         } catch (Exception e) {
-            log.error("[{}] Failed to update maxUnackedMessagesPerConsumer configuration for namespace {}", clientAppId(), namespaceName,
-                    e);
+            log.error("[{}] Failed to update maxUnackedMessagesPerConsumer configuration for namespace {}",
+                    clientAppId(), namespaceName, e);
             throw new RestException(e);
         }
     }
@@ -2698,22 +2783,25 @@ public abstract class NamespacesBase extends AdminResource {
             policies.max_unacked_messages_per_subscription = maxUnackedMessagesPerSubscription;
             globalZk().setData(path, jsonMapper().writeValueAsBytes(policies), nodeStat.getVersion());
             policiesCache().invalidate(path(POLICIES, namespaceName.toString()));
-            log.info("[{}] Successfully updated maxUnackedMessagesPerSubscription configuration: namespace={}, value={}", clientAppId(),
-                    namespaceName, policies.max_unacked_messages_per_subscription);
+            log.info("[{}] Successfully updated maxUnackedMessagesPerSubscription"
+                            + " configuration: namespace={}, value={}",
+                    clientAppId(), namespaceName, policies.max_unacked_messages_per_subscription);
 
         } catch (KeeperException.NoNodeException e) {
-            log.warn("[{}] Failed to update maxUnackedMessagesPerSubscription configuration for namespace {}: does not exist", clientAppId(),
-                    namespaceName);
+            log.warn("[{}] Failed to update maxUnackedMessagesPerSubscription configuration for"
+                            + " namespace {}: does not exist",
+                    clientAppId(), namespaceName);
             throw new RestException(Status.NOT_FOUND, "Namespace does not exist");
         } catch (KeeperException.BadVersionException e) {
-            log.warn("[{}] Failed to update maxUnackedMessagesPerSubscription configuration for namespace {}: concurrent modification",
+            log.warn("[{}] Failed to update maxUnackedMessagesPerSubscription configuration for"
+                            + " namespace {}: concurrent modification",
                     clientAppId(), namespaceName);
             throw new RestException(Status.CONFLICT, "Concurrent modification");
         } catch (RestException pfe) {
             throw pfe;
         } catch (Exception e) {
-            log.error("[{}] Failed to update maxUnackedMessagesPerSubscription configuration for namespace {}", clientAppId(), namespaceName,
-                    e);
+            log.error("[{}] Failed to update maxUnackedMessagesPerSubscription configuration for namespace {}",
+                    clientAppId(), namespaceName, e);
             throw new RestException(e);
         }
     }
@@ -2747,8 +2835,9 @@ public abstract class NamespacesBase extends AdminResource {
                      clientAppId(), namespaceName);
             throw new RestException(Status.NOT_FOUND, "Namespace does not exist");
         } catch (KeeperException.BadVersionException e) {
-            log.warn("[{}] Failed to update compactionThreshold configuration for namespace {}: concurrent modification",
-                     clientAppId(), namespaceName);
+            log.warn("[{}] Failed to update compactionThreshold configuration for"
+                            + " namespace {}: concurrent modification",
+                    clientAppId(), namespaceName);
             throw new RestException(Status.CONFLICT, "Concurrent modification");
         } catch (RestException pfe) {
             throw pfe;
@@ -2843,8 +2932,9 @@ public abstract class NamespacesBase extends AdminResource {
                      clientAppId(), namespaceName);
             throw new RestException(Status.NOT_FOUND, "Namespace does not exist");
         } catch (KeeperException.BadVersionException e) {
-            log.warn("[{}] Failed to update offloadDeletionLagMs configuration for namespace {}: concurrent modification",
-                     clientAppId(), namespaceName);
+            log.warn("[{}] Failed to update offloadDeletionLagMs configuration for"
+                            + " namespace {}: concurrent modification",
+                    clientAppId(), namespaceName);
             throw new RestException(Status.CONFLICT, "Concurrent modification");
         } catch (RestException pfe) {
             throw pfe;
@@ -2857,15 +2947,17 @@ public abstract class NamespacesBase extends AdminResource {
 
     @Deprecated
     protected SchemaAutoUpdateCompatibilityStrategy internalGetSchemaAutoUpdateCompatibilityStrategy() {
-        validateNamespacePolicyOperation(namespaceName, PolicyName.SCHEMA_COMPATIBILITY_STRATEGY, PolicyOperation.READ);
+        validateNamespacePolicyOperation(namespaceName, PolicyName.SCHEMA_COMPATIBILITY_STRATEGY,
+                PolicyOperation.READ);
         return getNamespacePolicies(namespaceName).schema_auto_update_compatibility_strategy;
     }
 
     protected SchemaCompatibilityStrategy internalGetSchemaCompatibilityStrategy() {
-        validateNamespacePolicyOperation(namespaceName, PolicyName.SCHEMA_COMPATIBILITY_STRATEGY, PolicyOperation.READ);
+        validateNamespacePolicyOperation(namespaceName, PolicyName.SCHEMA_COMPATIBILITY_STRATEGY,
+                PolicyOperation.READ);
         Policies policies = getNamespacePolicies(namespaceName);
         SchemaCompatibilityStrategy schemaCompatibilityStrategy = policies.schema_compatibility_strategy;
-        if (schemaCompatibilityStrategy == SchemaCompatibilityStrategy.UNDEFINED){
+        if (schemaCompatibilityStrategy == SchemaCompatibilityStrategy.UNDEFINED) {
             schemaCompatibilityStrategy = SchemaCompatibilityStrategy
                     .fromAutoUpdatePolicy(policies.schema_auto_update_compatibility_strategy);
         }
@@ -2874,18 +2966,20 @@ public abstract class NamespacesBase extends AdminResource {
 
     @Deprecated
     protected void internalSetSchemaAutoUpdateCompatibilityStrategy(SchemaAutoUpdateCompatibilityStrategy strategy) {
-        validateNamespacePolicyOperation(namespaceName, PolicyName.SCHEMA_COMPATIBILITY_STRATEGY, PolicyOperation.WRITE);
+        validateNamespacePolicyOperation(namespaceName, PolicyName.SCHEMA_COMPATIBILITY_STRATEGY,
+                PolicyOperation.WRITE);
         validatePoliciesReadOnlyAccess();
 
         mutatePolicy((policies) -> {
-                policies.schema_auto_update_compatibility_strategy = strategy;
-                return policies;
-            }, (policies) -> policies.schema_auto_update_compatibility_strategy,
-            "schemaAutoUpdateCompatibilityStrategy");
+                    policies.schema_auto_update_compatibility_strategy = strategy;
+                    return policies;
+                }, (policies) -> policies.schema_auto_update_compatibility_strategy,
+                "schemaAutoUpdateCompatibilityStrategy");
     }
 
     protected void internalSetSchemaCompatibilityStrategy(SchemaCompatibilityStrategy strategy) {
-        validateNamespacePolicyOperation(namespaceName, PolicyName.SCHEMA_COMPATIBILITY_STRATEGY, PolicyOperation.WRITE);
+        validateNamespacePolicyOperation(namespaceName, PolicyName.SCHEMA_COMPATIBILITY_STRATEGY,
+                PolicyOperation.WRITE);
         validatePoliciesReadOnlyAccess();
 
         mutatePolicy((policies) -> {
@@ -2896,28 +2990,32 @@ public abstract class NamespacesBase extends AdminResource {
     }
 
     protected boolean internalGetSchemaValidationEnforced() {
-        validateNamespacePolicyOperation(namespaceName, PolicyName.SCHEMA_COMPATIBILITY_STRATEGY, PolicyOperation.READ);
+        validateNamespacePolicyOperation(namespaceName, PolicyName.SCHEMA_COMPATIBILITY_STRATEGY,
+                PolicyOperation.READ);
         return getNamespacePolicies(namespaceName).schema_validation_enforced;
     }
 
     protected void internalSetSchemaValidationEnforced(boolean schemaValidationEnforced) {
-        validateNamespacePolicyOperation(namespaceName, PolicyName.SCHEMA_COMPATIBILITY_STRATEGY, PolicyOperation.WRITE);
+        validateNamespacePolicyOperation(namespaceName, PolicyName.SCHEMA_COMPATIBILITY_STRATEGY,
+                PolicyOperation.WRITE);
         validatePoliciesReadOnlyAccess();
 
         mutatePolicy((policies) -> {
-                policies. schema_validation_enforced = schemaValidationEnforced;
-                return policies;
-            }, (policies) -> policies. schema_validation_enforced,
-            "schemaValidationEnforced");
+                    policies.schema_validation_enforced = schemaValidationEnforced;
+                    return policies;
+                }, (policies) -> policies.schema_validation_enforced,
+                "schemaValidationEnforced");
     }
 
     protected boolean internalGetIsAllowAutoUpdateSchema() {
-        validateNamespacePolicyOperation(namespaceName, PolicyName.SCHEMA_COMPATIBILITY_STRATEGY, PolicyOperation.READ);
+        validateNamespacePolicyOperation(namespaceName, PolicyName.SCHEMA_COMPATIBILITY_STRATEGY,
+                PolicyOperation.READ);
         return getNamespacePolicies(namespaceName).is_allow_auto_update_schema;
     }
 
     protected void internalSetIsAllowAutoUpdateSchema(boolean isAllowAutoUpdateSchema) {
-        validateNamespacePolicyOperation(namespaceName, PolicyName.SCHEMA_COMPATIBILITY_STRATEGY, PolicyOperation.WRITE);
+        validateNamespacePolicyOperation(namespaceName, PolicyName.SCHEMA_COMPATIBILITY_STRATEGY,
+                PolicyOperation.WRITE);
         validatePoliciesReadOnlyAccess();
 
         mutatePolicy((policies) -> {
@@ -2988,8 +3086,8 @@ public abstract class NamespacesBase extends AdminResource {
                     (rc, path1, ctx, stat) -> {
                         if (rc == KeeperException.Code.OK.intValue()) {
                             policiesCache().invalidate(path(POLICIES, namespaceName.toString()));
-                            log.info("[{}] Successfully updated offload configuration: namespace={}, map={}", clientAppId(),
-                                    namespaceName, updatedOffloadPolicies);
+                            log.info("[{}] Successfully updated offload configuration: namespace={}, map={}",
+                                    clientAppId(), namespaceName, updatedOffloadPolicies);
                             asyncResponse.resume(Response.noContent().build());
                         } else {
                             String errorMsg = String.format(
@@ -3061,8 +3159,8 @@ public abstract class NamespacesBase extends AdminResource {
                     "The offloadPolicies must be specified for namespace offload.");
         }
         if (!offloadPolicies.driverSupported()) {
-            log.warn("[{}] Failed to update offload configuration for namespace {}: " +
-                            "driver is not supported, support value: {}",
+            log.warn("[{}] Failed to update offload configuration for namespace {}: "
+                            + "driver is not supported, support value: {}",
                     clientAppId(), namespaceName, OffloadPolicies.getSupportedDriverNames());
             throw new RestException(Status.PRECONDITION_FAILED,
                     "The driver is not supported, support value: " + OffloadPolicies.getSupportedDriverNames());
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index c0cb9ef..81f35cb 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -482,7 +482,8 @@ public class PersistentTopicsBase extends AdminResource {
      *
      * @param numPartitions
      */
-    protected void internalUpdatePartitionedTopic(int numPartitions, boolean updateLocalTopicOnly, boolean authoritative) {
+    protected void internalUpdatePartitionedTopic(int numPartitions,
+                                                  boolean updateLocalTopicOnly, boolean authoritative) {
         validateWriteOperationOnTopic(authoritative);
         // Only do the validation if it's the first hop.
         if (!updateLocalTopicOnly) {
@@ -490,7 +491,8 @@ public class PersistentTopicsBase extends AdminResource {
         }
         final int maxPartitions = pulsar().getConfig().getMaxNumPartitionsPerPartitionedTopic();
         if (maxPartitions > 0 && numPartitions > maxPartitions) {
-            throw new RestException(Status.NOT_ACCEPTABLE, "Number of partitions should be less than or equal to " + maxPartitions);
+            throw new RestException(Status.NOT_ACCEPTABLE,
+                    "Number of partitions should be less than or equal to " + maxPartitions);
         }
 
         if (topicName.isGlobal() && isNamespaceReplicated(topicName.getNamespaceObject())) {
@@ -574,13 +576,15 @@ public class PersistentTopicsBase extends AdminResource {
                 });
             }
         }).exceptionally(e -> {
-            log.error("[{}] Failed to create partitions for topic {}", clientAppId(), topicName);
+            log.error("[{}] Failed to create partitions for topic {}",
+                    clientAppId(), topicName);
             resumeAsyncResponseExceptionally(asyncResponse, e);
             return null;
         });
     }
 
-    protected void internalSetDelayedDeliveryPolicies(AsyncResponse asyncResponse, DelayedDeliveryPolicies deliveryPolicies) {
+    protected void internalSetDelayedDeliveryPolicies(AsyncResponse asyncResponse,
+                                                      DelayedDeliveryPolicies deliveryPolicies) {
         TopicPolicies topicPolicies = null;
         try {
             topicPolicies = pulsar().getTopicPoliciesService().getTopicPolicies(topicName);
@@ -593,7 +597,8 @@ public class PersistentTopicsBase extends AdminResource {
             topicPolicies = new TopicPolicies();
         }
         topicPolicies.setDelayedDeliveryEnabled(deliveryPolicies == null ? null : deliveryPolicies.isActive());
-        topicPolicies.setDelayedDeliveryTickTimeMillis(deliveryPolicies == null ? null : deliveryPolicies.getTickTime());
+        topicPolicies.setDelayedDeliveryTickTimeMillis(
+                deliveryPolicies == null ? null : deliveryPolicies.getTickTime());
         pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies)
                 .whenComplete((result, ex) -> {
                     if (ex != null) {
@@ -606,19 +611,22 @@ public class PersistentTopicsBase extends AdminResource {
     }
 
     private CompletableFuture<Void> updatePartitionInOtherCluster(int numPartitions, Set<String> clusters) {
-        List<CompletableFuture<Void>> results = new ArrayList<>(clusters.size() -1);
+        List<CompletableFuture<Void>> results = new ArrayList<>(clusters.size() - 1);
         clusters.forEach(cluster -> {
             if (cluster.equals(pulsar().getConfig().getClusterName())) {
                 return;
             }
             results.add(pulsar().getBrokerService().getClusterPulsarAdmin(cluster).topics()
-                    .updatePartitionedTopicAsync(topicName.toString(), numPartitions, true));
+                    .updatePartitionedTopicAsync(topicName.toString(),
+                            numPartitions, true));
         });
         return FutureUtil.waitForAll(results);
     }
 
-    protected PartitionedTopicMetadata internalGetPartitionedMetadata(boolean authoritative, boolean checkAllowAutoCreation) {
-        PartitionedTopicMetadata metadata = getPartitionedTopicMetadata(topicName, authoritative, checkAllowAutoCreation);
+    protected PartitionedTopicMetadata internalGetPartitionedMetadata(boolean authoritative,
+                                                                      boolean checkAllowAutoCreation) {
+        PartitionedTopicMetadata metadata = getPartitionedTopicMetadata(topicName,
+                authoritative, checkAllowAutoCreation);
         if (metadata.partitions > 1) {
             validateClientVersion();
         }
@@ -661,13 +669,16 @@ public class PersistentTopicsBase extends AdminResource {
                 for (int i = 0; i < numPartitions; i++) {
                     TopicName topicNamePartition = topicName.getPartition(i);
                     try {
-                        pulsar().getAdminClient().topics().deleteAsync(topicNamePartition.toString(), force)
+                        pulsar().getAdminClient().topics()
+                                .deleteAsync(topicNamePartition.toString(), force)
                                 .whenComplete((r, ex) -> {
                                     if (ex != null) {
                                         if (ex instanceof NotFoundException) {
                                             // if the sub-topic is not found, the client might not have called create
-                                            // producer or it might have been deleted earlier, so we ignore the 404 error.
-                                            // For all other exception, we fail the delete partition method even if a single
+                                            // producer or it might have been deleted earlier,
+                                            //so we ignore the 404 error.
+                                            // For all other exception,
+                                            //we fail the delete partition method even if a single
                                             // partition is failed to be deleted
                                             if (log.isDebugEnabled()) {
                                                 log.debug("[{}] Partition not found: {}", clientAppId(),
@@ -730,8 +741,10 @@ public class PersistentTopicsBase extends AdminResource {
                                 log.info("[{}] Deleted partitioned topic {}", clientAppId(), topicName);
                                 asyncResponse.resume(Response.noContent().build());
                             } else {
-                                log.error("[{}] Failed to delete partitioned topic {}", clientAppId(), topicName, KeeperException.create(KeeperException.Code.get(rc2)));
-                                asyncResponse.resume(new RestException(KeeperException.create(KeeperException.Code.get(rc2))));
+                                log.error("[{}] Failed to delete partitioned topic {}", clientAppId(),
+                                        topicName, KeeperException.create(KeeperException.Code.get(rc2)));
+                                asyncResponse.resume(new RestException(
+                                        KeeperException.create(KeeperException.Code.get(rc2))));
                             }
                         }, null);
                     } catch (Exception e) {
@@ -745,7 +758,8 @@ public class PersistentTopicsBase extends AdminResource {
                             topicName);
                     asyncResponse.resume(new RestException(Status.CONFLICT, "Concurrent modification"));
                 } else {
-                    log.error("[{}] Failed to delete partitioned topic {}", clientAppId(), topicName, KeeperException.create(KeeperException.Code.get(rc)));
+                    log.error("[{}] Failed to delete partitioned topic {}", clientAppId(),
+                            topicName, KeeperException.create(KeeperException.Code.get(rc)));
                     asyncResponse.resume(new RestException(KeeperException.create(KeeperException.Code.get(rc))));
                 }
             }, null);
@@ -857,7 +871,8 @@ public class PersistentTopicsBase extends AdminResource {
             topicPolicies = pulsar().getTopicPoliciesService().getTopicPolicies(topicName);
         } catch (BrokerServiceException.TopicPoliciesCacheNotInitException e) {
             log.error("Topic {} policies cache have not init.", topicName);
-            return FutureUtil.failedFuture(new RestException(Status.PRECONDITION_FAILED, "Policies cache have not init"));
+            return FutureUtil.failedFuture(new RestException(Status.PRECONDITION_FAILED,
+                    "Policies cache have not init"));
         }
         if (topicPolicies == null) {
             topicPolicies = new TopicPolicies();
@@ -866,30 +881,33 @@ public class PersistentTopicsBase extends AdminResource {
         return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
     }
 
-    private CompletableFuture<Void> internalUpdateOffloadPolicies(OffloadPolicies offloadPolicies, TopicName topicName) {
+    private CompletableFuture<Void> internalUpdateOffloadPolicies(OffloadPolicies offloadPolicies,
+                                                                  TopicName topicName) {
         return pulsar().getBrokerService().getTopicIfExists(topicName.toString())
                 .thenAccept(optionalTopic -> {
-            try {
-                if (!optionalTopic.isPresent() || !topicName.isPersistent()) {
-                    return;
-                }
-                PersistentTopic persistentTopic = (PersistentTopic) optionalTopic.get();
-                ManagedLedgerConfig managedLedgerConfig = persistentTopic.getManagedLedger().getConfig();
-                if (offloadPolicies == null) {
-                    LedgerOffloader namespaceOffloader = pulsar().getLedgerOffloaderMap().get(topicName.getNamespaceObject());
-                    LedgerOffloader topicOffloader = managedLedgerConfig.getLedgerOffloader();
-                    if (topicOffloader != null && topicOffloader != namespaceOffloader) {
-                        topicOffloader.close();
+                    try {
+                        if (!optionalTopic.isPresent() || !topicName.isPersistent()) {
+                            return;
+                        }
+                        PersistentTopic persistentTopic = (PersistentTopic) optionalTopic.get();
+                        ManagedLedgerConfig managedLedgerConfig = persistentTopic.getManagedLedger().getConfig();
+                        if (offloadPolicies == null) {
+                            LedgerOffloader namespaceOffloader =
+                                    pulsar().getLedgerOffloaderMap().get(topicName.getNamespaceObject());
+                            LedgerOffloader topicOffloader = managedLedgerConfig.getLedgerOffloader();
+                            if (topicOffloader != null && topicOffloader != namespaceOffloader) {
+                                topicOffloader.close();
+                            }
+                            managedLedgerConfig.setLedgerOffloader(namespaceOffloader);
+                        } else {
+                            managedLedgerConfig.setLedgerOffloader(
+                                    pulsar().createManagedLedgerOffloader(offloadPolicies));
+                        }
+                        persistentTopic.getManagedLedger().setConfig(managedLedgerConfig);
+                    } catch (PulsarServerException e) {
+                        throw new RestException(e);
                     }
-                    managedLedgerConfig.setLedgerOffloader(namespaceOffloader);
-                } else {
-                    managedLedgerConfig.setLedgerOffloader(pulsar().createManagedLedgerOffloader(offloadPolicies));
-                }
-                persistentTopic.getManagedLedger().setConfig(managedLedgerConfig);
-            } catch (PulsarServerException e) {
-                throw new RestException(e);
-            }
-        });
+                });
     }
 
     protected CompletableFuture<Void> internalSetMaxUnackedMessagesOnSubscription(Integer maxUnackedNum) {
@@ -923,7 +941,8 @@ public class PersistentTopicsBase extends AdminResource {
             topicPolicies = pulsar().getTopicPoliciesService().getTopicPolicies(topicName);
         } catch (BrokerServiceException.TopicPoliciesCacheNotInitException e) {
             log.error("Topic {} policies cache have not init.", topicName);
-            return FutureUtil.failedFuture(new RestException(Status.PRECONDITION_FAILED, "Policies cache have not init"));
+            return FutureUtil.failedFuture(new RestException(Status.PRECONDITION_FAILED,
+                    "Policies cache have not init"));
         }
         if (topicPolicies == null) {
             topicPolicies = new TopicPolicies();
@@ -1006,10 +1025,12 @@ public class PersistentTopicsBase extends AdminResource {
         if (topicName.isPartitioned()) {
             internalGetSubscriptionsForNonPartitionedTopic(asyncResponse, authoritative);
         } else {
-            getPartitionedTopicMetadataAsync(topicName, authoritative, false).thenAccept(partitionMetadata -> {
+            getPartitionedTopicMetadataAsync(topicName, authoritative,
+                    false).thenAccept(partitionMetadata -> {
                 if (partitionMetadata.partitions > 0) {
                     try {
-                        // get the subscriptions only from the 1st partition since all the other partitions will have the same
+                        // get the subscriptions only from the 1st partition
+                        // since all the other partitions will have the same
                         // subscriptions
                         pulsar().getAdminClient().topics().getSubscriptionsAsync(topicName.getPartition(0).toString())
                                 .whenComplete((r, ex) -> {
@@ -1060,7 +1081,8 @@ public class PersistentTopicsBase extends AdminResource {
             asyncResponse.resume(subscriptions);
         } catch (WebApplicationException wae) {
             if (log.isDebugEnabled()) {
-                log.debug("[{}] Failed to get subscriptions for non-partitioned topic {}, redirecting to other brokers.",
+                log.debug("[{}] Failed to get subscriptions for non-partitioned topic {},"
+                                + " redirecting to other brokers.",
                         clientAppId(), topicName, wae);
             }
             resumeAsyncResponseExceptionally(asyncResponse, wae);
@@ -1092,7 +1114,8 @@ public class PersistentTopicsBase extends AdminResource {
             boolean includeMetadata = metadata && hasSuperUserAccess();
             return topic.getInternalStats(includeMetadata).get();
         } catch (Exception e) {
-            throw new RestException(Status.INTERNAL_SERVER_ERROR, (e instanceof ExecutionException) ? e.getCause().getMessage() : e.getMessage());
+            throw new RestException(Status.INTERNAL_SERVER_ERROR,
+                    (e instanceof ExecutionException) ? e.getCause().getMessage() : e.getMessage());
         }
     }
 
@@ -1111,7 +1134,8 @@ public class PersistentTopicsBase extends AdminResource {
         if (topicName.isPartitioned()) {
             internalGetManagedLedgerInfoForNonPartitionedTopic(asyncResponse);
         } else {
-            getPartitionedTopicMetadataAsync(topicName, authoritative, false).thenAccept(partitionMetadata -> {
+            getPartitionedTopicMetadataAsync(topicName,
+                    authoritative, false).thenAccept(partitionMetadata -> {
                 if (partitionMetadata.partitions > 0) {
                     final List<CompletableFuture<JsonObject>> futures = Lists.newArrayList();
 
@@ -1121,20 +1145,24 @@ public class PersistentTopicsBase extends AdminResource {
                         TopicName topicNamePartition = topicName.getPartition(i);
                         try {
                             futures.add(pulsar().getAdminClient().topics()
-                                .getInternalInfoAsync(topicNamePartition.toString()).whenComplete((jsonObject, throwable) -> {
-                                    if(throwable != null) {
-                                        log.error("[{}] Failed to get managed info for {}", clientAppId(), topicNamePartition, throwable);
-                                        asyncResponse.resume(new RestException(throwable));
-                                    }
-                                    Gson gson = new GsonBuilder().setPrettyPrinting().create();
-                                    try {
-                                        partitionedManagedLedgerInfo.partitions.put(topicNamePartition.toString(),
-                                            jsonMapper().readValue(gson.toJson(jsonObject), ManagedLedgerInfo.class));
-                                    } catch (JsonProcessingException ex) {
-                                        log.error("[{}] Failed to parse ManagedLedgerInfo for {} from [{}]", clientAppId(),
-                                            topicNamePartition, gson.toJson(jsonObject), ex);
-                                    }
-                                }));
+                                    .getInternalInfoAsync(
+                                            topicNamePartition.toString()).whenComplete((jsonObject, throwable) -> {
+                                        if (throwable != null) {
+                                            log.error("[{}] Failed to get managed info for {}",
+                                                    clientAppId(), topicNamePartition, throwable);
+                                            asyncResponse.resume(new RestException(throwable));
+                                        }
+                                        Gson gson = new GsonBuilder().setPrettyPrinting().create();
+                                        try {
+                                            partitionedManagedLedgerInfo.partitions.put(topicNamePartition.toString(),
+                                                    jsonMapper().readValue(gson.toJson(jsonObject),
+                                                            ManagedLedgerInfo.class));
+                                        } catch (JsonProcessingException ex) {
+                                            log.error("[{}] Failed to parse ManagedLedgerInfo for {} from [{}]",
+                                                    clientAppId(),
+                                                    topicNamePartition, gson.toJson(jsonObject), ex);
+                                        }
+                                    }));
                         } catch (Exception e) {
                             log.error("[{}] Failed to get managed info for {}", clientAppId(), topicNamePartition, e);
                             throw new RestException(e);
@@ -1148,7 +1176,7 @@ public class PersistentTopicsBase extends AdminResource {
                                 asyncResponse.resume(new RestException(Status.NOT_FOUND, "Topic not found"));
                             } else {
                                 log.error("[{}] Failed to get managed info for {}", clientAppId(), topicName, t);
-                                asyncResponse.resume( new RestException(t));
+                                asyncResponse.resume(new RestException(t));
                             }
                         }
                         asyncResponse.resume((StreamingOutput) output -> {
@@ -1203,7 +1231,8 @@ public class PersistentTopicsBase extends AdminResource {
                 return;
             }
         }
-        getPartitionedTopicMetadataAsync(topicName, authoritative, false).thenAccept(partitionMetadata -> {
+        getPartitionedTopicMetadataAsync(topicName,
+                authoritative, false).thenAccept(partitionMetadata -> {
             if (partitionMetadata.partitions == 0) {
                 asyncResponse.resume(new RestException(Status.NOT_FOUND, "Partitioned Topic not found"));
                 return;
@@ -1213,7 +1242,8 @@ public class PersistentTopicsBase extends AdminResource {
             for (int i = 0; i < partitionMetadata.partitions; i++) {
                 try {
                     topicStatsFutureList
-                            .add(pulsar().getAdminClient().topics().getStatsAsync((topicName.getPartition(i).toString()), getPreciseBacklog));
+                            .add(pulsar().getAdminClient().topics().getStatsAsync(
+                                    (topicName.getPartition(i).toString()), getPreciseBacklog));
                 } catch (PulsarServerException e) {
                     asyncResponse.resume(new RestException(e));
                     return;
@@ -1244,7 +1274,8 @@ public class PersistentTopicsBase extends AdminResource {
                             stats.partitions.put(topicName.toString(), new TopicStats());
                         } else {
                             asyncResponse.resume(
-                                    new RestException(Status.NOT_FOUND, "Internal topics have not been generated yet"));
+                                    new RestException(Status.NOT_FOUND,
+                                            "Internal topics have not been generated yet"));
                             return null;
                         }
                     } catch (KeeperException | InterruptedException e) {
@@ -1272,7 +1303,8 @@ public class PersistentTopicsBase extends AdminResource {
                 return;
             }
         }
-        getPartitionedTopicMetadataAsync(topicName, authoritative, false).thenAccept(partitionMetadata -> {
+        getPartitionedTopicMetadataAsync(topicName,
+                authoritative, false).thenAccept(partitionMetadata -> {
             if (partitionMetadata.partitions == 0) {
                 asyncResponse.resume(new RestException(Status.NOT_FOUND, "Partitioned Topic not found"));
                 return;
@@ -1315,7 +1347,8 @@ public class PersistentTopicsBase extends AdminResource {
         });
     }
 
-    protected void internalDeleteSubscription(AsyncResponse asyncResponse, String subName, boolean authoritative, boolean force) {
+    protected void internalDeleteSubscription(AsyncResponse asyncResponse,
+                                              String subName, boolean authoritative, boolean force) {
         if (force) {
             internalDeleteSubscriptionForcefully(asyncResponse, subName, authoritative);
         } else {
@@ -1337,7 +1370,8 @@ public class PersistentTopicsBase extends AdminResource {
         if (topicName.isPartitioned()) {
             internalDeleteSubscriptionForNonPartitionedTopic(asyncResponse, subName, authoritative);
         } else {
-            getPartitionedTopicMetadataAsync(topicName, authoritative, false).thenAccept(partitionMetadata -> {
+            getPartitionedTopicMetadataAsync(topicName,
+                    authoritative, false).thenAccept(partitionMetadata -> {
                 if (partitionMetadata.partitions > 0) {
                     final List<CompletableFuture<Void>> futures = Lists.newArrayList();
 
@@ -1347,7 +1381,8 @@ public class PersistentTopicsBase extends AdminResource {
                             futures.add(pulsar().getAdminClient().topics()
                                     .deleteSubscriptionAsync(topicNamePartition.toString(), subName, false));
                         } catch (Exception e) {
-                            log.error("[{}] Failed to delete subscription {} {}", clientAppId(), topicNamePartition, subName,
+                            log.error("[{}] Failed to delete subscription {} {}",
+                                    clientAppId(), topicNamePartition, subName,
                                     e);
                             asyncResponse.resume(new RestException(e));
                             return;
@@ -1365,7 +1400,8 @@ public class PersistentTopicsBase extends AdminResource {
                                         "Subscription has active connected consumers"));
                                 return null;
                             } else {
-                                log.error("[{}] Failed to delete subscription {} {}", clientAppId(), topicName, subName, t);
+                                log.error("[{}] Failed to delete subscription {} {}",
+                                        clientAppId(), topicName, subName, t);
                                 asyncResponse.resume(new RestException(t));
                                 return null;
                             }
@@ -1378,14 +1414,16 @@ public class PersistentTopicsBase extends AdminResource {
                     internalDeleteSubscriptionForNonPartitionedTopic(asyncResponse, subName, authoritative);
                 }
             }).exceptionally(ex -> {
-                log.error("[{}] Failed to delete subscription {} from topic {}", clientAppId(), subName, topicName, ex);
+                log.error("[{}] Failed to delete subscription {} from topic {}",
+                        clientAppId(), subName, topicName, ex);
                 resumeAsyncResponseExceptionally(asyncResponse, ex);
                 return null;
             });
         }
     }
 
-    private void internalDeleteSubscriptionForNonPartitionedTopic(AsyncResponse asyncResponse, String subName, boolean authoritative) {
+    private void internalDeleteSubscriptionForNonPartitionedTopic(AsyncResponse asyncResponse,
+                                                                  String subName, boolean authoritative) {
         try {
             validateAdminAccessForSubscriber(subName, authoritative);
             Topic topic = getTopicReference(topicName);
@@ -1415,12 +1453,14 @@ public class PersistentTopicsBase extends AdminResource {
         }
     }
 
-    protected void internalDeleteSubscriptionForcefully(AsyncResponse asyncResponse, String subName, boolean authoritative) {
+    protected void internalDeleteSubscriptionForcefully(AsyncResponse asyncResponse,
+                                                        String subName, boolean authoritative) {
         if (topicName.isGlobal()) {
             try {
                 validateGlobalNamespaceOwnership(namespaceName);
             } catch (Exception e) {
-                log.error("[{}] Failed to delete subscription forcefully {} from topic {}", clientAppId(), subName, topicName, e);
+                log.error("[{}] Failed to delete subscription forcefully {} from topic {}",
+                        clientAppId(), subName, topicName, e);
                 resumeAsyncResponseExceptionally(asyncResponse, e);
                 return;
             }
@@ -1429,7 +1469,8 @@ public class PersistentTopicsBase extends AdminResource {
         if (topicName.isPartitioned()) {
             internalDeleteSubscriptionForNonPartitionedTopicForcefully(asyncResponse, subName, authoritative);
         } else {
-            getPartitionedTopicMetadataAsync(topicName, authoritative, false).thenAccept(partitionMetadata -> {
+            getPartitionedTopicMetadataAsync(topicName,
+                    authoritative, false).thenAccept(partitionMetadata -> {
                 if (partitionMetadata.partitions > 0) {
                     final List<CompletableFuture<Void>> futures = Lists.newArrayList();
 
@@ -1437,10 +1478,11 @@ public class PersistentTopicsBase extends AdminResource {
                         TopicName topicNamePartition = topicName.getPartition(i);
                         try {
                             futures.add(pulsar().getAdminClient().topics()
-                                .deleteSubscriptionAsync(topicNamePartition.toString(), subName, true));
+                                    .deleteSubscriptionAsync(topicNamePartition.toString(), subName, true));
                         } catch (Exception e) {
-                            log.error("[{}] Failed to delete subscription forcefully {} {}", clientAppId(), topicNamePartition, subName,
-                                e);
+                            log.error("[{}] Failed to delete subscription forcefully {} {}",
+                                    clientAppId(), topicNamePartition, subName,
+                                    e);
                             asyncResponse.resume(new RestException(e));
                             return;
                         }
@@ -1453,7 +1495,8 @@ public class PersistentTopicsBase extends AdminResource {
                                 asyncResponse.resume(new RestException(Status.NOT_FOUND, "Subscription not found"));
                                 return null;
                             } else {
-                                log.error("[{}] Failed to delete subscription forcefully {} {}", clientAppId(), topicName, subName, t);
+                                log.error("[{}] Failed to delete subscription forcefully {} {}",
+                                        clientAppId(), topicName, subName, t);
                                 asyncResponse.resume(new RestException(t));
                                 return null;
                             }
@@ -1466,14 +1509,16 @@ public class PersistentTopicsBase extends AdminResource {
                     internalDeleteSubscriptionForNonPartitionedTopicForcefully(asyncResponse, subName, authoritative);
                 }
             }).exceptionally(ex -> {
-                log.error("[{}] Failed to delete subscription forcefully {} from topic {}", clientAppId(), subName, topicName, ex);
+                log.error("[{}] Failed to delete subscription forcefully {} from topic {}",
+                        clientAppId(), subName, topicName, ex);
                 resumeAsyncResponseExceptionally(asyncResponse, ex);
                 return null;
             });
         }
     }
 
-    private void internalDeleteSubscriptionForNonPartitionedTopicForcefully(AsyncResponse asyncResponse, String subName, boolean authoritative) {
+    private void internalDeleteSubscriptionForNonPartitionedTopicForcefully(AsyncResponse asyncResponse,
+                                                                            String subName, boolean authoritative) {
         try {
             validateAdminAccessForSubscriber(subName, authoritative);
             Topic topic = getTopicReference(topicName);
@@ -1488,12 +1533,14 @@ public class PersistentTopicsBase extends AdminResource {
         } catch (Exception e) {
             if (e instanceof WebApplicationException) {
                 if (log.isDebugEnabled()) {
-                    log.debug("[{}] Failed to delete subscription forcefully from topic {}, redirecting to other brokers.",
+                    log.debug("[{}] Failed to delete subscription forcefully from topic {},"
+                                    + " redirecting to other brokers.",
                             clientAppId(), topicName, e);
                 }
                 asyncResponse.resume(e);
             } else {
-                log.error("[{}] Failed to delete subscription forcefully {} {}", clientAppId(), topicName, subName, e);
+                log.error("[{}] Failed to delete subscription forcefully {} {}",
+                        clientAppId(), topicName, subName, e);
                 asyncResponse.resume(new RestException(e));
             }
         }
@@ -1504,7 +1551,8 @@ public class PersistentTopicsBase extends AdminResource {
             try {
                 validateGlobalNamespaceOwnership(namespaceName);
             } catch (Exception e) {
-                log.error("[{}] Failed to skip all messages for subscription {} on topic {}", clientAppId(), subName, topicName, e);
+                log.error("[{}] Failed to skip all messages for subscription {} on topic {}",
+                        clientAppId(), subName, topicName, e);
                 resumeAsyncResponseExceptionally(asyncResponse, e);
                 return;
             }
@@ -1513,17 +1561,22 @@ public class PersistentTopicsBase extends AdminResource {
         if (topicName.isPartitioned()) {
             internalSkipAllMessagesForNonPartitionedTopic(asyncResponse, subName, authoritative);
         } else {
-            getPartitionedTopicMetadataAsync(topicName, authoritative, false).thenAccept(partitionMetadata -> {
+            getPartitionedTopicMetadataAsync(topicName,
+                    authoritative, false).thenAccept(partitionMetadata -> {
                 if (partitionMetadata.partitions > 0) {
                     final List<CompletableFuture<Void>> futures = Lists.newArrayList();
 
                     for (int i = 0; i < partitionMetadata.partitions; i++) {
                         TopicName topicNamePartition = topicName.getPartition(i);
                         try {
-                            futures.add(pulsar().getAdminClient().topics().skipAllMessagesAsync(topicNamePartition.toString(),
-                                    subName));
+                            futures.add(pulsar()
+                                    .getAdminClient()
+                                    .topics()
+                                    .skipAllMessagesAsync(topicNamePartition.toString(),
+                                            subName));
                         } catch (Exception e) {
-                            log.error("[{}] Failed to skip all messages {} {}", clientAppId(), topicNamePartition, subName, e);
+                            log.error("[{}] Failed to skip all messages {} {}",
+                                    clientAppId(), topicNamePartition, subName, e);
                             asyncResponse.resume(new RestException(e));
                             return;
                         }
@@ -1536,7 +1589,8 @@ public class PersistentTopicsBase extends AdminResource {
                                 asyncResponse.resume(new RestException(Status.NOT_FOUND, "Subscription not found"));
                                 return null;
                             } else {
-                                log.error("[{}] Failed to skip all messages {} {}", clientAppId(), topicName, subName, t);
+                                log.error("[{}] Failed to skip all messages {} {}",
+                                        clientAppId(), topicName, subName, t);
                                 asyncResponse.resume(new RestException(t));
                                 return null;
                             }
@@ -1549,14 +1603,16 @@ public class PersistentTopicsBase extends AdminResource {
                     internalSkipAllMessagesForNonPartitionedTopic(asyncResponse, subName, authoritative);
                 }
             }).exceptionally(ex -> {
-                log.error("[{}] Failed to skip all messages for subscription {} on topic {}", clientAppId(), subName, topicName, ex);
+                log.error("[{}] Failed to skip all messages for subscription {} on topic {}",
+                        clientAppId(), subName, topicName, ex);
                 resumeAsyncResponseExceptionally(asyncResponse, ex);
-               return null;
+                return null;
             });
         }
     }
 
-    private void internalSkipAllMessagesForNonPartitionedTopic(AsyncResponse asyncResponse, String subName, boolean authoritative) {
+    private void internalSkipAllMessagesForNonPartitionedTopic(AsyncResponse asyncResponse,
+                                                               String subName, boolean authoritative) {
         try {
             validateAdminAccessForSubscriber(subName, authoritative);
             PersistentTopic topic = (PersistentTopic) getTopicReference(topicName);
@@ -1587,12 +1643,14 @@ public class PersistentTopicsBase extends AdminResource {
             }
         } catch (WebApplicationException wae) {
             if (log.isDebugEnabled()) {
-                log.debug("[{}] Failed to skip all messages for subscription on topic {}, redirecting to other brokers.",
+                log.debug("[{}] Failed to skip all messages for subscription on topic {},"
+                                + " redirecting to other brokers.",
                         clientAppId(), topicName, wae);
             }
             resumeAsyncResponseExceptionally(asyncResponse, wae);
         } catch (Exception e) {
-            log.error("[{}] Failed to skip all messages for subscription {} on topic {}", clientAppId(), subName, topicName, e);
+            log.error("[{}] Failed to skip all messages for subscription {} on topic {}",
+                    clientAppId(), subName, topicName, e);
             resumeAsyncResponseExceptionally(asyncResponse, e);
         }
     }
@@ -1601,7 +1659,8 @@ public class PersistentTopicsBase extends AdminResource {
         if (topicName.isGlobal()) {
             validateGlobalNamespaceOwnership(namespaceName);
         }
-        PartitionedTopicMetadata partitionMetadata = getPartitionedTopicMetadata(topicName, authoritative, false);
+        PartitionedTopicMetadata partitionMetadata = getPartitionedTopicMetadata(topicName,
+                authoritative, false);
         if (partitionMetadata.partitions > 0) {
             throw new RestException(Status.METHOD_NOT_ALLOWED, "Skip messages on a partitioned topic is not allowed");
         }
@@ -1634,16 +1693,19 @@ public class PersistentTopicsBase extends AdminResource {
             try {
                 validateGlobalNamespaceOwnership(namespaceName);
             } catch (Exception e) {
-                log.error("[{}] Failed to expire messages for all subscription on topic {}", clientAppId(), topicName, e);
+                log.error("[{}] Failed to expire messages for all subscription on topic {}",
+                        clientAppId(), topicName, e);
                 resumeAsyncResponseExceptionally(asyncResponse, e);
                 return;
             }
         }
         // If the topic name is a partition name, no need to get partition topic metadata again
         if (topicName.isPartitioned()) {
-            internalExpireMessagesForAllSubscriptionsForNonPartitionedTopic(asyncResponse, expireTimeInSeconds, authoritative);
+            internalExpireMessagesForAllSubscriptionsForNonPartitionedTopic(asyncResponse,
+                    expireTimeInSeconds, authoritative);
         } else {
-            getPartitionedTopicMetadataAsync(topicName, authoritative, false).thenAccept(partitionMetadata -> {
+            getPartitionedTopicMetadataAsync(topicName,
+                    authoritative, false).thenAccept(partitionMetadata -> {
                 if (partitionMetadata.partitions > 0) {
                     final List<CompletableFuture<Void>> futures = Lists.newArrayList();
 
@@ -1651,10 +1713,14 @@ public class PersistentTopicsBase extends AdminResource {
                     for (int i = 0; i < partitionMetadata.partitions; i++) {
                         TopicName topicNamePartition = topicName.getPartition(i);
                         try {
-                            futures.add(pulsar().getAdminClient().topics().expireMessagesForAllSubscriptionsAsync(
-                                    topicNamePartition.toString(), expireTimeInSeconds));
+                            futures.add(pulsar()
+                                    .getAdminClient()
+                                    .topics()
+                                    .expireMessagesForAllSubscriptionsAsync(
+                                            topicNamePartition.toString(), expireTimeInSeconds));
                         } catch (Exception e) {
-                            log.error("[{}] Failed to expire messages up to {} on {}", clientAppId(), expireTimeInSeconds,
+                            log.error("[{}] Failed to expire messages up to {} on {}",
+                                    clientAppId(), expireTimeInSeconds,
                                     topicNamePartition, e);
                             asyncResponse.resume(new RestException(e));
                             return;
@@ -1664,7 +1730,8 @@ public class PersistentTopicsBase extends AdminResource {
                     FutureUtil.waitForAll(futures).handle((result, exception) -> {
                         if (exception != null) {
                             Throwable t = exception.getCause();
-                            log.error("[{}] Failed to expire messages up to {} on {}", clientAppId(), expireTimeInSeconds,
+                            log.error("[{}] Failed to expire messages up to {} on {}",
+                                    clientAppId(), expireTimeInSeconds,
                                     topicName, t);
                             asyncResponse.resume(new RestException(t));
                             return null;
@@ -1674,18 +1741,21 @@ public class PersistentTopicsBase extends AdminResource {
                         return null;
                     });
                 } else {
-                    internalExpireMessagesForAllSubscriptionsForNonPartitionedTopic(asyncResponse, expireTimeInSeconds, authoritative);
+                    internalExpireMessagesForAllSubscriptionsForNonPartitionedTopic(asyncResponse,
+                            expireTimeInSeconds, authoritative);
                 }
             }).exceptionally(ex -> {
-                log.error("[{}] Failed to expire messages for all subscription on topic {}", clientAppId(), topicName, ex);
+                log.error("[{}] Failed to expire messages for all subscription on topic {}",
+                        clientAppId(), topicName, ex);
                 resumeAsyncResponseExceptionally(asyncResponse, ex);
                 return null;
             });
         }
     }
 
-    private void internalExpireMessagesForAllSubscriptionsForNonPartitionedTopic(AsyncResponse asyncResponse, int expireTimeInSeconds,
-            boolean authoritative) {
+    private void internalExpireMessagesForAllSubscriptionsForNonPartitionedTopic(AsyncResponse asyncResponse,
+                                                                                 int expireTimeInSeconds,
+                                                                                 boolean authoritative) {
         // validate ownership and redirect if current broker is not owner
         PersistentTopic topic;
         try {
@@ -1694,13 +1764,15 @@ public class PersistentTopicsBase extends AdminResource {
             topic = (PersistentTopic) getTopicReference(topicName);
         } catch (WebApplicationException wae) {
             if (log.isDebugEnabled()) {
-                log.debug("[{}] Failed to expire messages for all subscription on topic {}, redirecting to other brokers.",
+                log.debug("[{}] Failed to expire messages for all subscription on topic {},"
+                                + " redirecting to other brokers.",
                         clientAppId(), topicName, wae);
             }
             resumeAsyncResponseExceptionally(asyncResponse, wae);
             return;
         } catch (Exception e) {
-            log.error("[{}] Failed to expire messages for all subscription on topic {}", clientAppId(), topicName, e);
+            log.error("[{}] Failed to expire messages for all subscription on topic {}",
+                    clientAppId(), topicName, e);
             resumeAsyncResponseExceptionally(asyncResponse, e);
             return;
         }
@@ -1742,7 +1814,8 @@ public class PersistentTopicsBase extends AdminResource {
             try {
                 validateGlobalNamespaceOwnership(namespaceName);
             } catch (Exception e) {
-                log.warn("[{}][{}] Failed to reset cursor on subscription {} to time {}: {}", clientAppId(), topicName,
+                log.warn("[{}][{}] Failed to reset cursor on subscription {} to time {}: {}",
+                        clientAppId(), topicName,
                         subName, timestamp, e.getMessage());
                 resumeAsyncResponseExceptionally(asyncResponse, e);
                 return;
@@ -1752,7 +1825,8 @@ public class PersistentTopicsBase extends AdminResource {
         if (topicName.isPartitioned()) {
             internalResetCursorForNonPartitionedTopic(asyncResponse, subName, timestamp, authoritative);
         } else {
-            getPartitionedTopicMetadataAsync(topicName, authoritative, false).thenAccept(partitionMetadata -> {
+            getPartitionedTopicMetadataAsync(topicName,
+                    authoritative, false).thenAccept(partitionMetadata -> {
                 final int numPartitions = partitionMetadata.partitions;
                 if (numPartitions > 0) {
                     final CompletableFuture<Void> future = new CompletableFuture<>();
@@ -1764,7 +1838,8 @@ public class PersistentTopicsBase extends AdminResource {
                         TopicName topicNamePartition = topicName.getPartition(i);
                         try {
                             pulsar().getAdminClient().topics()
-                                    .resetCursorAsync(topicNamePartition.toString(), subName, timestamp).handle((r, ex) -> {
+                                    .resetCursorAsync(topicNamePartition.toString(),
+                                            subName, timestamp).handle((r, ex) -> {
                                 if (ex != null) {
                                     if (ex instanceof PreconditionFailedException) {
                                         // throw the last exception if all partitions get this error
@@ -1805,14 +1880,16 @@ public class PersistentTopicsBase extends AdminResource {
 
                         // report an error to user if unable to reset for all partitions
                         if (failureCount.get() == numPartitions) {
-                            log.warn("[{}] [{}] Failed to reset cursor on subscription {} to time {}", clientAppId(), topicName,
+                            log.warn("[{}] [{}] Failed to reset cursor on subscription {} to time {}",
+                                    clientAppId(), topicName,
                                     subName, timestamp, partitionException.get());
                             asyncResponse.resume(
-                                    new RestException(Status.PRECONDITION_FAILED, partitionException.get().getMessage()));
+                                    new RestException(Status.PRECONDITION_FAILED,
+                                            partitionException.get().getMessage()));
                             return;
                         } else if (failureCount.get() > 0) {
-                            log.warn("[{}] [{}] Partial errors for reset cursor on subscription {} to time {}", clientAppId(),
-                                    topicName, subName, timestamp, partitionException.get());
+                            log.warn("[{}] [{}] Partial errors for reset cursor on subscription {} to time {}",
+                                    clientAppId(), topicName, subName, timestamp, partitionException.get());
                         }
 
                         asyncResponse.resume(Response.noContent().build());
@@ -1821,7 +1898,8 @@ public class PersistentTopicsBase extends AdminResource {
                     internalResetCursorForNonPartitionedTopic(asyncResponse, subName, timestamp, authoritative);
                 }
             }).exceptionally(ex -> {
-                log.error("[{}] Failed to expire messages for all subscription on topic {}", clientAppId(), topicName, ex);
+                log.error("[{}] Failed to expire messages for all subscription on topic {}",
+                        clientAppId(), topicName, ex);
                 resumeAsyncResponseExceptionally(asyncResponse, ex);
                 return null;
             });
@@ -1832,8 +1910,8 @@ public class PersistentTopicsBase extends AdminResource {
                                        boolean authoritative) {
         try {
             validateAdminAccessForSubscriber(subName, authoritative);
-            log.info("[{}] [{}] Received reset cursor on subscription {} to time {}", clientAppId(), topicName, subName,
-                    timestamp);
+            log.info("[{}] [{}] Received reset cursor on subscription {} to time {}",
+                    clientAppId(), topicName, subName, timestamp);
             PersistentTopic topic = (PersistentTopic) getTopicReference(topicName);
             if (topic == null) {
                 asyncResponse.resume(new RestException(Status.NOT_FOUND, "Topic not found"));
@@ -1864,8 +1942,8 @@ public class PersistentTopicsBase extends AdminResource {
                 return null;
             });
         } catch (Exception e) {
-            log.warn("[{}][{}] Failed to reset cursor on subscription {} to time {}", clientAppId(), topicName, subName,
-                    timestamp, e);
+            log.warn("[{}][{}] Failed to reset cursor on subscription {} to time {}",
+                    clientAppId(), topicName, subName, timestamp, e);
             if (e instanceof NotAllowedException) {
                 asyncResponse.resume(new RestException(Status.METHOD_NOT_ALLOWED, e.getMessage()));
             } else {
@@ -1880,7 +1958,8 @@ public class PersistentTopicsBase extends AdminResource {
             try {
                 validateGlobalNamespaceOwnership(namespaceName);
             } catch (Exception e) {
-                log.error("[{}] Failed to create subscription {} on topic {}", clientAppId(), subscriptionName, topicName, e);
+                log.error("[{}] Failed to create subscription {} on topic {}",
+                        clientAppId(), subscriptionName, topicName, e);
                 resumeAsyncResponseExceptionally(asyncResponse, e);
                 return;
             }
@@ -1890,10 +1969,12 @@ public class PersistentTopicsBase extends AdminResource {
                 targetMessageId);
         // If the topic name is a partition name, no need to get partition topic metadata again
         if (topicName.isPartitioned()) {
-            internalCreateSubscriptionForNonPartitionedTopic(asyncResponse, subscriptionName, targetMessageId, authoritative, replicated);
+            internalCreateSubscriptionForNonPartitionedTopic(asyncResponse,
+                    subscriptionName, targetMessageId, authoritative, replicated);
         } else {
             boolean allowAutoTopicCreation = pulsar().getConfiguration().isAllowAutoTopicCreation();
-            getPartitionedTopicMetadataAsync(topicName, authoritative, allowAutoTopicCreation).thenAccept(partitionMetadata -> {
+            getPartitionedTopicMetadataAsync(topicName,
+                    authoritative, allowAutoTopicCreation).thenAccept(partitionMetadata -> {
                 final int numPartitions = partitionMetadata.partitions;
                 if (numPartitions > 0) {
                     final CompletableFuture<Void> future = new CompletableFuture<>();
@@ -1906,10 +1987,12 @@ public class PersistentTopicsBase extends AdminResource {
                         TopicName topicNamePartition = topicName.getPartition(i);
                         try {
                             pulsar().getAdminClient().topics()
-                                    .createSubscriptionAsync(topicNamePartition.toString(), subscriptionName, targetMessageId)
+                                    .createSubscriptionAsync(topicNamePartition.toString(),
+                                            subscriptionName, targetMessageId)
                                     .handle((r, ex) -> {
                                         if (ex != null) {
-                                            // fail the operation on unknown exception or if all the partitioned failed due to
+                                            // fail the operation on unknown exception or
+                                            // if all the partitioned failed due to
                                             // subscription-already-exist
                                             if (failureCount.incrementAndGet() == numPartitions
                                                     || !(ex instanceof PulsarAdminException.ConflictException)) {
@@ -1942,10 +2025,12 @@ public class PersistentTopicsBase extends AdminResource {
                         }
 
                         if (partitionException.get() != null) {
-                            log.warn("[{}] [{}] Failed to create subscription {} at message id {}", clientAppId(), topicName,
+                            log.warn("[{}] [{}] Failed to create subscription {} at message id {}",
+                                    clientAppId(), topicName,
                                     subscriptionName, targetMessageId, partitionException.get());
                             if (partitionException.get() instanceof PulsarAdminException) {
-                                asyncResponse.resume(new RestException((PulsarAdminException) partitionException.get()));
+                                asyncResponse.resume(
+                                        new RestException((PulsarAdminException) partitionException.get()));
                                 return;
                             } else {
                                 asyncResponse.resume(new RestException(partitionException.get()));
@@ -1956,18 +2041,21 @@ public class PersistentTopicsBase extends AdminResource {
                         asyncResponse.resume(Response.noContent().build());
                     });
                 } else {
-                    internalCreateSubscriptionForNonPartitionedTopic(asyncResponse, subscriptionName, targetMessageId, authoritative, replicated);
+                    internalCreateSubscriptionForNonPartitionedTopic(asyncResponse,
+                            subscriptionName, targetMessageId, authoritative, replicated);
                 }
             }).exceptionally(ex -> {
-                log.error("[{}] Failed to create subscription {} on topic {}", clientAppId(), subscriptionName, topicName, ex);
+                log.error("[{}] Failed to create subscription {} on topic {}",
+                        clientAppId(), subscriptionName, topicName, ex);
                 resumeAsyncResponseExceptionally(asyncResponse, ex);
                 return null;
             });
         }
     }
 
-    private void internalCreateSubscriptionForNonPartitionedTopic(AsyncResponse asyncResponse, String subscriptionName,
-              MessageIdImpl targetMessageId, boolean authoritative, boolean replicated) {
+    private void internalCreateSubscriptionForNonPartitionedTopic(
+            AsyncResponse asyncResponse, String subscriptionName,
+            MessageIdImpl targetMessageId, boolean authoritative, boolean replicated) {
         try {
             validateAdminAccessForSubscriber(subscriptionName, authoritative);
 
@@ -2051,10 +2139,12 @@ public class PersistentTopicsBase extends AdminResource {
                 if (batchIndex >= 0) {
                     try {
                         ManagedLedgerImpl ledger = (ManagedLedgerImpl) topic.getManagedLedger();
-                        ledger.asyncReadEntry(new PositionImpl(messageId.getLedgerId(), messageId.getEntryId()), new AsyncCallbacks.ReadEntryCallback() {
+                        ledger.asyncReadEntry(new PositionImpl(messageId.getLedgerId(),
+                                messageId.getEntryId()), new AsyncCallbacks.ReadEntryCallback() {
                             @Override
                             public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
-                                // Since we can't read the message from the storage layer, it might be an already delete message ID or an invalid message ID
+                                // Since we can't read the message from the storage layer,
+                                // it might be an already delete message ID or an invalid message ID
                                 // We should fall back to non batch index seek.
                                 batchSizeFuture.complete(0);
                             }
@@ -2066,7 +2156,8 @@ public class PersistentTopicsBase extends AdminResource {
                                         if (entry == null) {
                                             batchSizeFuture.complete(0);
                                         } else {
-                                            MessageMetadata metadata = Commands.parseMessageMetadata(entry.getDataBuffer());
+                                            MessageMetadata metadata =
+                                                    Commands.parseMessageMetadata(entry.getDataBuffer());
                                             batchSizeFuture.complete(metadata.getNumMessagesInBatch());
                                         }
                                     } catch (Exception e) {
@@ -2099,7 +2190,8 @@ public class PersistentTopicsBase extends AdminResource {
                             bitSet.clear(0, Math.max(batchIndex + 1, 0));
                             if (bitSet.length() > 0) {
                                 ackSet = bitSet.toLongArray();
-                                seekPosition = PositionImpl.get(messageId.getLedgerId(), messageId.getEntryId(), ackSet);
+                                seekPosition = PositionImpl.get(messageId.getLedgerId(),
+                                        messageId.getEntryId(), ackSet);
                             } else {
                                 seekPosition = PositionImpl.get(messageId.getLedgerId(), messageId.getEntryId());
                                 seekPosition = seekPosition.getNext();
@@ -2108,7 +2200,8 @@ public class PersistentTopicsBase extends AdminResource {
                             if (batchIndex - 1 >= 0) {
                                 bitSet.clear(0, batchIndex);
                                 ackSet = bitSet.toLongArray();
-                                seekPosition = PositionImpl.get(messageId.getLedgerId(), messageId.getEntryId(), ackSet);
+                                seekPosition = PositionImpl.get(messageId.getLedgerId(),
+                                        messageId.getEntryId(), ackSet);
                             } else {
                                 seekPosition = PositionImpl.get(messageId.getLedgerId(), messageId.getEntryId());
                             }
@@ -2188,7 +2281,8 @@ public class PersistentTopicsBase extends AdminResource {
     protected Response internalPeekNthMessage(String subName, int messagePosition, boolean authoritative) {
         verifyReadOperation(authoritative);
         // If the topic name is a partition name, no need to get partition topic metadata again
-        if (!topicName.isPartitioned() && getPartitionedTopicMetadata(topicName, authoritative, false).partitions > 0) {
+        if (!topicName.isPartitioned() && getPartitionedTopicMetadata(topicName,
+                authoritative, false).partitions > 0) {
             throw new RestException(Status.METHOD_NOT_ALLOWED, "Peek messages on a partitioned topic is not allowed");
         }
         validateAdminAccessForSubscriber(subName, authoritative);
@@ -2233,14 +2327,17 @@ public class PersistentTopicsBase extends AdminResource {
             validateGlobalNamespaceOwnership(namespaceName);
         }
 
-        if (!topicName.isPartitioned() && getPartitionedTopicMetadata(topicName, authoritative, false).partitions > 0) {
-            throw new RestException(Status.METHOD_NOT_ALLOWED, "Examine messages on a partitioned topic is not allowed, " +
-                    "please try examine message on specific topic partition");
+        if (!topicName.isPartitioned() && getPartitionedTopicMetadata(topicName,
+                authoritative, false).partitions > 0) {
+            throw new RestException(Status.METHOD_NOT_ALLOWED,
+                    "Examine messages on a partitioned topic is not allowed, "
+                            + "please try examine message on specific topic partition");
         }
         validateTopicOwnership(topicName, authoritative);
         if (!(getTopicReference(topicName) instanceof PersistentTopic)) {
             log.error("[{}] Not supported operation of non-persistent topic {} ", clientAppId(), topicName);
-            throw new RestException(Status.METHOD_NOT_ALLOWED, "Examine messages on a non-persistent topic is not allowed");
+            throw new RestException(Status.METHOD_NOT_ALLOWED,
+                    "Examine messages on a non-persistent topic is not allowed");
         }
 
         if (messagePosition < 1) {
@@ -2255,7 +2352,8 @@ public class PersistentTopicsBase extends AdminResource {
             PersistentTopic topic = (PersistentTopic) getTopicReference(topicName);
             long totalMessage = topic.getNumberOfEntries();
             PositionImpl startPosition = topic.getFirstPosition();
-            long messageToSkip = initialPosition.equals("earliest")? messagePosition : totalMessage - messagePosition + 1;
+            long messageToSkip =
+                    initialPosition.equals("earliest") ? messagePosition : totalMessage - messagePosition + 1;
             CompletableFuture<Entry> future = new CompletableFuture<>();
             PositionImpl readPosition = topic.getPositionAfterN(startPosition, messageToSkip);
             topic.asyncReadEntry(readPosition, new AsyncCallbacks.ReadEntryCallback() {
@@ -2376,7 +2474,8 @@ public class PersistentTopicsBase extends AdminResource {
         return offlineTopicStats;
     }
 
-    protected void internalSetBacklogQuota(AsyncResponse asyncResponse, BacklogQuota.BacklogQuotaType backlogQuotaType, BacklogQuota backlogQuota) {
+    protected void internalSetBacklogQuota(AsyncResponse asyncResponse,
+                                           BacklogQuota.BacklogQuotaType backlogQuotaType, BacklogQuota backlogQuota) {
         validateAdminAccessForTenant(namespaceName.getTenant());
         validatePoliciesReadOnlyAccess();
         if (topicName.isGlobal()) {
@@ -2399,25 +2498,25 @@ public class PersistentTopicsBase extends AdminResource {
         }
 
         RetentionPolicies retentionPolicies = getRetentionPolicies(topicName, topicPolicies);
-        if(!checkBacklogQuota(backlogQuota,retentionPolicies)){
+        if (!checkBacklogQuota(backlogQuota, retentionPolicies)) {
             log.warn(
                     "[{}] Failed to update backlog configuration for topic {}: conflicts with retention quota",
                     clientAppId(), topicName);
             asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED,
-                    "Backlog Quota exceeds configured retention quota for topic. " +
-                            "Please increase retention quota and retry"));
+                    "Backlog Quota exceeds configured retention quota for topic. "
+                            + "Please increase retention quota and retry"));
         }
 
-        if(backlogQuota != null){
+        if (backlogQuota != null) {
             topicPolicies.getBackLogQuotaMap().put(backlogQuotaType.name(), backlogQuota);
-        }else {
+        } else {
             topicPolicies.getBackLogQuotaMap().remove(backlogQuotaType.name());
         }
         Map<String, BacklogQuota> backLogQuotaMap = topicPolicies.getBackLogQuotaMap();
         pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies)
                 .whenComplete((r, ex) -> {
                     if (ex != null) {
-                        log.error("Failed updated backlog quota map",ex);
+                        log.error("Failed updated backlog quota map", ex);
                         asyncResponse.resume(new RestException(ex));
                     } else {
                         try {
@@ -2474,7 +2573,7 @@ public class PersistentTopicsBase extends AdminResource {
         pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies)
                 .whenComplete((result, ex) -> {
                     if (ex != null) {
-                        log.error("Failed set message ttl for topic",ex);
+                        log.error("Failed set message ttl for topic", ex);
                         asyncResponse.resume(new RestException(ex));
                     } else {
                         log.info("[{}] Successfully set topic message ttl: namespace={}, topic={}, ttl={}",
@@ -2518,7 +2617,7 @@ public class PersistentTopicsBase extends AdminResource {
                 .map(TopicPolicies::getRetentionPolicies);
         if (!retention.isPresent()) {
             asyncResponse.resume(Response.noContent().build());
-        }else {
+        } else {
             asyncResponse.resume(retention.get());
         }
     }
@@ -2537,17 +2636,17 @@ public class PersistentTopicsBase extends AdminResource {
                 .orElseGet(TopicPolicies::new);
         BacklogQuota backlogQuota =
                     topicPolicies.getBackLogQuotaMap().get(BacklogQuota.BacklogQuotaType.destination_storage.name());
-        if (backlogQuota == null){
+        if (backlogQuota == null) {
             Policies policies = getNamespacePolicies(topicName.getNamespaceObject());
             backlogQuota = policies.backlog_quota_map.get(BacklogQuota.BacklogQuotaType.destination_storage);
         }
-        if(!checkBacklogQuota(backlogQuota, retention)){
+        if (!checkBacklogQuota(backlogQuota, retention)) {
             log.warn(
                     "[{}] Failed to update retention quota configuration for topic {}: conflicts with retention quota",
                     clientAppId(), topicName);
             throw new RestException(Status.PRECONDITION_FAILED,
-                    "Retention Quota must exceed configured backlog quota for topic. " +
-                            "Please increase retention quota and retry");
+                    "Retention Quota must exceed configured backlog quota for topic. "
+                            + "Please increase retention quota and retry");
         }
         topicPolicies.setRetentionPolicies(retention);
         return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
@@ -2610,8 +2709,8 @@ public class PersistentTopicsBase extends AdminResource {
     protected CompletableFuture<Void> internalSetMaxMessageSize(Integer maxMessageSize) {
         if (maxMessageSize != null && (maxMessageSize < 0 || maxMessageSize > config().getMaxMessageSize())) {
             throw new RestException(Status.PRECONDITION_FAILED
-                    , "topic-level maxMessageSize must be greater than or equal to 0 " +
-                    "and must be smaller than that in the broker-level");
+                    , "topic-level maxMessageSize must be greater than or equal to 0 "
+                    + "and must be smaller than that in the broker-level");
         }
 
         validateAdminAccessForTenant(namespaceName.getTenant());
@@ -2757,12 +2856,13 @@ public class PersistentTopicsBase extends AdminResource {
             try {
               futures.add(pulsar().getAdminClient().topics()
                   .terminateTopicAsync(topicNamePartition.toString()).whenComplete((messageId, throwable) -> {
-                      if(throwable != null) {
-                          log.error("[{}] Failed to terminate topic {}", clientAppId(), topicNamePartition, throwable);
-                          asyncResponse.resume(new RestException(throwable));
-                      }
-                      messageIds.add(messageId);
-                  }));
+                          if (throwable != null) {
+                              log.error("[{}] Failed to terminate topic {}", clientAppId(), topicNamePartition,
+                                      throwable);
+                              asyncResponse.resume(new RestException(throwable));
+                          }
+                          messageIds.add(messageId);
+                      }));
             } catch (Exception e) {
               log.error("[{}] Failed to terminate topic {}", clientAppId(), topicNamePartition, e);
               throw new RestException(e);
@@ -2774,8 +2874,8 @@ public class PersistentTopicsBase extends AdminResource {
               if (t instanceof NotFoundException) {
                 asyncResponse.resume(new RestException(Status.NOT_FOUND, "Topic not found"));
               } else {
-                log.error("[{}] Failed to terminate topic {}", clientAppId(), topicName, t);
-                  asyncResponse.resume( new RestException(t));
+                  log.error("[{}] Failed to terminate topic {}", clientAppId(), topicName, t);
+                  asyncResponse.resume(new RestException(t));
               }
             }
           asyncResponse.resume(messageIds);
@@ -2811,8 +2911,11 @@ public class PersistentTopicsBase extends AdminResource {
                 for (int i = 0; i < partitionMetadata.partitions; i++) {
                     TopicName topicNamePartition = topicName.getPartition(i);
                     try {
-                        futures.add(pulsar().getAdminClient().topics().expireMessagesAsync(topicNamePartition.toString(),
-                            subName, expireTimeInSeconds));
+                        futures.add(pulsar()
+                                .getAdminClient()
+                                .topics()
+                                .expireMessagesAsync(topicNamePartition.toString(),
+                                        subName, expireTimeInSeconds));
                     } catch (Exception e) {
                         log.error("[{}] Failed to expire messages up to {} on {}", clientAppId(), expireTimeInSeconds,
                             topicNamePartition, e);
@@ -2828,8 +2931,9 @@ public class PersistentTopicsBase extends AdminResource {
                             asyncResponse.resume(new RestException(Status.NOT_FOUND, "Subscription not found"));
                             return null;
                         } else {
-                            log.error("[{}] Failed to expire messages up to {} on {}", clientAppId(), expireTimeInSeconds,
-                                topicName, t);
+                            log.error("[{}] Failed to expire messages up to {} on {}",
+                                    clientAppId(), expireTimeInSeconds,
+                                    topicName, t);
                             asyncResponse.resume(new RestException(t));
                             return null;
                         }
@@ -2928,9 +3032,13 @@ public class PersistentTopicsBase extends AdminResource {
                     for (int i = 0; i < numPartitions; i++) {
                         TopicName topicNamePartition = topicName.getPartition(i);
                         try {
-                            futures.add(pulsar().getAdminClient().topics().triggerCompactionAsync(topicNamePartition.toString()));
+                            futures.add(pulsar()
+                                    .getAdminClient()
+                                    .topics()
+                                    .triggerCompactionAsync(topicNamePartition.toString()));
                         } catch (Exception e) {
-                            log.error("[{}] Failed to trigger compaction on topic {}", clientAppId(), topicNamePartition, e);
+                            log.error("[{}] Failed to trigger compaction on topic {}",
+                                    clientAppId(), topicNamePartition, e);
                             asyncResponse.resume(new RestException(e));
                             return;
                         }
@@ -2946,7 +3054,8 @@ public class PersistentTopicsBase extends AdminResource {
                                 asyncResponse.resume(th);
                                 return null;
                             } else {
-                                log.error("[{}] Failed to trigger compaction on topic {}", clientAppId(), topicName, exception);
+                                log.error("[{}] Failed to trigger compaction on topic {}",
+                                        clientAppId(), topicName, exception);
                                 asyncResponse.resume(new RestException(exception));
                                 return null;
                             }
@@ -3011,8 +3120,9 @@ public class PersistentTopicsBase extends AdminResource {
         return topic.offloadStatus();
     }
 
-    public static CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMetadata(PulsarService pulsar,
-            String clientAppId, String originalPrincipal, AuthenticationDataSource authenticationData, TopicName topicName) {
+    public static CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMetadata(
+            PulsarService pulsar, String clientAppId, String originalPrincipal,
+            AuthenticationDataSource authenticationData, TopicName topicName) {
         CompletableFuture<PartitionedTopicMetadata> metadataFuture = new CompletableFuture<>();
         try {
             // (1) authorize client
@@ -3020,7 +3130,8 @@ public class PersistentTopicsBase extends AdminResource {
                 checkAuthorization(pulsar, topicName, clientAppId, authenticationData);
             } catch (RestException e) {
                 try {
-                    validateAdminAccessForTenant(pulsar, clientAppId, originalPrincipal, topicName.getTenant(), authenticationData);
+                    validateAdminAccessForTenant(pulsar,
+                            clientAppId, originalPrincipal, topicName.getTenant(), authenticationData);
                 } catch (RestException authException) {
                     log.warn("Failed to authorize {} on cluster {}", clientAppId, topicName.toString());
                     throw new PulsarClientException(String.format("Authorization failed %s on topic %s with error %s",
@@ -3084,7 +3195,7 @@ public class PersistentTopicsBase extends AdminResource {
     }
 
     /**
-     * Get the Topic object reference from the Pulsar broker
+     * Get the Topic object reference from the Pulsar broker.
      */
     private Topic getTopicReference(TopicName topicName) {
         try {
@@ -3106,8 +3217,8 @@ public class PersistentTopicsBase extends AdminResource {
         PartitionedTopicMetadata partitionedTopicMetadata = getPartitionedTopicMetadata(
                 TopicName.get(topicName.getPartitionedTopicName()), false, false);
         if (partitionedTopicMetadata == null || partitionedTopicMetadata.partitions == 0) {
-            final String topicErrorType = partitionedTopicMetadata == null ?
-                    "has no metadata" : "has zero partitions";
+            final String topicErrorType = partitionedTopicMetadata
+                    == null ? "has no metadata" : "has zero partitions";
             return new RestException(Status.NOT_FOUND, String.format(
                     "Partitioned Topic not found: %s %s", topicName.toString(), topicErrorType));
         } else if (!internalGetList().contains(topicName.toString())) {
@@ -3117,17 +3228,19 @@ public class PersistentTopicsBase extends AdminResource {
     }
 
     private Topic getOrCreateTopic(TopicName topicName) {
-        return pulsar().getBrokerService().getTopic(topicName.toString(), true).thenApply(Optional::get).join();
+        return pulsar().getBrokerService().getTopic(
+                topicName.toString(), true).thenApply(Optional::get).join();
     }
 
     /**
-     * Get the Subscription object reference from the Topic reference
+     * Get the Subscription object reference from the Topic reference.
      */
     private Subscription getSubscriptionReference(String subName, PersistentTopic topic) {
         try {
             Subscription sub = topic.getSubscription(subName);
             if (sub == null) {
-                sub = topic.createSubscription(subName, InitialPosition.Earliest, false).get();
+                sub = topic.createSubscription(subName,
+                        InitialPosition.Earliest, false).get();
             }
 
             return checkNotNull(sub);
@@ -3137,7 +3250,7 @@ public class PersistentTopicsBase extends AdminResource {
     }
 
     /**
-     * Get the Replicator object reference from the Topic reference
+     * Get the Replicator object reference from the Topic reference.
      */
     private PersistentReplicator getReplicatorReference(String replName, PersistentTopic topic) {
         try {
@@ -3176,12 +3289,10 @@ public class PersistentTopicsBase extends AdminResource {
     }
 
     /**
-     * It creates subscriptions for new partitions of existing partitioned-topics
+     * It creates subscriptions for new partitions of existing partitioned-topics.
      *
-     * @param topicName
-     *            : topic-name: persistent://prop/cluster/ns/topic
-     * @param numPartitions
-     *            : number partitions for the topics
+     * @param topicName     : topic-name: persistent://prop/cluster/ns/topic
+     * @param numPartitions : number partitions for the topics
      */
     private CompletableFuture<Void> createSubscriptions(TopicName topicName, int numPartitions) {
         CompletableFuture<Void> result = new CompletableFuture<>();
@@ -3228,7 +3339,8 @@ public class PersistentTopicsBase extends AdminResource {
                     log.info("[{}] Successfully created new partitions {}", clientAppId(), topicName);
                     result.complete(null);
                 }).exceptionally(ex -> {
-                    log.warn("[{}] Failed to create subscriptions on new partitions for {}", clientAppId(), topicName, ex);
+                    log.warn("[{}] Failed to create subscriptions on new partitions for {}",
+                            clientAppId(), topicName, ex);
                     result.completeExceptionally(ex);
                     return null;
                 });
@@ -3237,13 +3349,15 @@ public class PersistentTopicsBase extends AdminResource {
                     // The first partition doesn't exist, so there are currently to subscriptions to recreate
                     result.complete(null);
                 } else {
-                    log.warn("[{}] Failed to get list of subscriptions of {}", clientAppId(), topicName.getPartition(0), ex);
+                    log.warn("[{}] Failed to get list of subscriptions of {}",
+                            clientAppId(), topicName.getPartition(0), ex);
                     result.completeExceptionally(ex);
                 }
                 return null;
             });
         }).exceptionally(ex -> {
-            log.warn("[{}] Failed to get partition metadata for {}", clientAppId(), topicName.toString());
+            log.warn("[{}] Failed to get partition metadata for {}",
+                    clientAppId(), topicName.toString());
             result.completeExceptionally(ex);
             return null;
         });
@@ -3262,7 +3376,8 @@ public class PersistentTopicsBase extends AdminResource {
         final String userAgent = httpRequest.getHeader("User-Agent");
         if (StringUtils.isBlank(userAgent)) {
             throw new RestException(Status.METHOD_NOT_ALLOWED,
-                    "Client lib is not compatible to access partitioned metadata: version in user-agent is not present");
+                    "Client lib is not compatible to"
+                            + " access partitioned metadata: version in user-agent is not present");
         }
         // Version < 1.20 for cpp-client is not allowed
         if (userAgent.contains(DEPRECATED_CLIENT_VERSION_PREFIX)) {
@@ -3307,15 +3422,21 @@ public class PersistentTopicsBase extends AdminResource {
                     long suffix = Long.parseLong(exsitingTopicName.substring(
                             exsitingTopicName.indexOf(TopicName.PARTITIONED_TOPIC_SUFFIX)
                                     + TopicName.PARTITIONED_TOPIC_SUFFIX.length()));
-                    // Skip partition of partitioned topic by making sure the numeric suffix greater than old partition number.
+                    // Skip partition of partitioned topic by making sure
+                    // the numeric suffix greater than old partition number.
                     if (suffix >= oldPartition && suffix <= (long) numberOfPartition) {
-                        log.warn("[{}] Already have non partition topic {} which contains partition " +
-                                "suffix '-partition-' and end with numeric value smaller than the new number of partition. " +
-                                "Update of partitioned topic {} could cause conflict.", clientAppId(), exsitingTopicName, topicName);
+                        log.warn(
+                                "[{}] Already have non partition topic {} which contains partition suffix"
+                                        + " '-partition-' and end with numeric value smaller than the new number"
+                                        + " of partition. Update of partitioned topic {} could cause conflict.",
+                                clientAppId(),
+                                exsitingTopicName, topicName);
                         throw new RestException(Status.PRECONDITION_FAILED,
-                                "Already have non partition topic" + exsitingTopicName + " which contains partition suffix '-partition-' " +
-                                        "and end with numeric value and end with numeric value smaller than the new " +
-                                        "number of partition. Update of partitioned topic " + topicName + " could cause conflict.");
+                                "Already have non partition topic" + exsitingTopicName
+                                        + " which contains partition suffix '-partition-' "
+                                        + "and end with numeric value and end with numeric value smaller than the new "
+                                        + "number of partition. Update of partitioned topic "
+                                        + topicName + " could cause conflict.");
                     }
                 } catch (NumberFormatException e) {
                     // Do nothing, if value after partition suffix is not pure numeric value,
@@ -3343,26 +3464,27 @@ public class PersistentTopicsBase extends AdminResource {
                 int partitionIndex = topicName.indexOf(TopicName.PARTITIONED_TOPIC_SUFFIX);
                 long suffix = Long.parseLong(topicName.substring(partitionIndex
                         + TopicName.PARTITIONED_TOPIC_SUFFIX.length()));
-                TopicName partitionTopicName = TopicName.get(domain(), namespaceName, topicName.substring(0, partitionIndex));
+                TopicName partitionTopicName = TopicName.get(domain(),
+                        namespaceName, topicName.substring(0, partitionIndex));
                 PartitionedTopicMetadata metadata = getPartitionedTopicMetadata(partitionTopicName, false, false);
 
                 // Partition topic index is 0 to (number of partition - 1)
                 if (metadata.partitions > 0 && suffix >= (long) metadata.partitions) {
-                    log.warn("[{}] Can't create topic {} with \"-partition-\" followed by" +
-                            " a number smaller then number of partition of partitioned topic {}.",
+                    log.warn("[{}] Can't create topic {} with \"-partition-\" followed by"
+                                    + " a number smaller then number of partition of partitioned topic {}.",
                             clientAppId(), topicName, partitionTopicName.getLocalName());
                     throw new RestException(Status.PRECONDITION_FAILED,
-                            "Can't create topic " + topicName + " with \"-partition-\" followed by" +
-                            " a number smaller then number of partition of partitioned topic " +
-                                    partitionTopicName.getLocalName());
+                            "Can't create topic " + topicName + " with \"-partition-\" followed by"
+                                    + " a number smaller then number of partition of partitioned topic "
+                                    + partitionTopicName.getLocalName());
                 } else if (metadata.partitions == 0) {
-                    log.warn("[{}] Can't create topic {} with \"-partition-\" followed by" +
-                                    " numeric value if there isn't a partitioned topic {} created.",
+                    log.warn("[{}] Can't create topic {} with \"-partition-\" followed by"
+                                    + " numeric value if there isn't a partitioned topic {} created.",
                             clientAppId(), topicName, partitionTopicName.getLocalName());
                     throw new RestException(Status.PRECONDITION_FAILED,
-                            "Can't create topic " + topicName + " with \"-partition-\" followed by" +
-                                    " numeric value if there isn't a partitioned topic " +
-                                    partitionTopicName.getLocalName() + " created.");
+                            "Can't create topic " + topicName + " with \"-partition-\" followed by"
+                                    + " numeric value if there isn't a partitioned topic "
+                                    + partitionTopicName.getLocalName() + " created.");
                 }
                 // If there is a  partitioned topic with the same name and numeric suffix is smaller than the
                 // number of partition for that partitioned topic, validation will pass.
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SinksBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SinksBase.java
index 81addf6..a38ef6d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SinksBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SinksBase.java
@@ -66,77 +66,96 @@ public class SinksBase extends AdminResource implements Supplier<WorkerService>
     @ApiResponses(value = {
             @ApiResponse(code = 400, message = "Invalid request (The Pulsar Sink already exists, etc.)"),
             @ApiResponse(code = 200, message = "Pulsar Sink successfully created"),
-            @ApiResponse(code = 500, message = "Internal server error (failed to authorize, failed to get tenant data, failed to process package, etc.)"),
+            @ApiResponse(code = 500, message =
+                    "Internal server error (failed to authorize,"
+                            + " failed to get tenant data, failed to process package, etc.)"),
             @ApiResponse(code = 401, message = "Client is not authorized to perform operation"),
             @ApiResponse(code = 503, message = "Function worker service is now initializing. Please try again later.")
     })
     @Path("/{tenant}/{namespace}/{sinkName}")
     @Consumes(MediaType.MULTIPART_FORM_DATA)
-    public void registerSink(@ApiParam(value = "The tenant of a Pulsar Sink")
-                             final @PathParam("tenant") String tenant,
-                             @ApiParam(value = "The namespace of a Pulsar Sink")
-                             final @PathParam("namespace") String namespace,
-                             @ApiParam(value = "The name of a Pulsar Sink")
-                             final @PathParam("sinkName") String sinkName,
+    public void registerSink(@ApiParam(value = "The tenant of a Pulsar Sink") final @PathParam("tenant") String tenant,
+                             @ApiParam(value = "The namespace of a Pulsar Sink") final @PathParam("namespace")
+                                     String namespace,
+                             @ApiParam(value = "The name of a Pulsar Sink") final @PathParam("sinkName")
+                                         String sinkName,
                              final @FormDataParam("data") InputStream uploadedInputStream,
                              final @FormDataParam("data") FormDataContentDisposition fileDetail,
                              final @FormDataParam("url") String sinkPkgUrl,
-                             @ApiParam(
-                                 value =
-                                     "A JSON value presenting config payload of a Pulsar Sink. All available configuration options are:  \n" +
-                                     "- **classname**  \n" +
-                                     "   The class name of a Pulsar Sink if archive is file-url-path (file://)  \n" +
-                                     "- **sourceSubscriptionName**  \n" +
-                                     "   Pulsar source subscription name if user wants a specific  \n" +
-                                     "   subscription-name for input-topic consumer  \n" +
-                                     "- **inputs**  \n" +
-                                     "   The input topic or topics of a Pulsar Sink (specified as a JSON array)  \n" +
-                                     "- **topicsPattern**  \n" +
-                                     "   TopicsPattern to consume from list of topics under a namespace that " +
-                                     "   match the pattern. [input] and [topicsPattern] are mutually " +
-                                     "   exclusive. Add SerDe class name for a pattern in customSerdeInputs " +
-                                     "   (supported for java fun only)" +
-                                     "- **topicToSerdeClassName**  \n" +
-                                     "   The map of input topics to SerDe class names (specified as a JSON object)  \n" +
-                                     "- **topicToSchemaType**  \n" +
-                                     "   The map of input topics to Schema types or class names (specified as a JSON object)  \n" +
-                                     "- **inputSpecs**  \n" +
-                                     "   The map of input topics to its consumer configuration, each configuration has schema of " +
-                                     "   {\"schemaType\": \"type-x\", \"serdeClassName\": \"name-x\", \"isRegexPattern\": true, \"receiverQueueSize\": 5}  \n" +
-                                     "- **configs**  \n" +
-                                     "   The map of configs (specified as a JSON object)  \n" +
-                                     "- **secrets**  \n" +
-                                     "   a map of secretName(aka how the secret is going to be \n" +
-                                     "   accessed in the function via context) to an object that \n" +
-                                     "   encapsulates how the secret is fetched by the underlying \n" +
-                                     "   secrets provider. The type of an value here can be found by the \n" +
-                                     "   SecretProviderConfigurator.getSecretObjectType() method. (specified as a JSON object)  \n" +
-                                     "- **parallelism**  \n" +
-                                     "   The parallelism factor of a Pulsar Sink (i.e. the number of a Pulsar Sink instances to run \n" +
-                                     "- **processingGuarantees**  \n" +
-                                     "   The processing guarantees (aka delivery semantics) applied to the Pulsar Sink. Possible Values: \"ATLEAST_ONCE\", \"ATMOST_ONCE\", \"EFFECTIVELY_ONCE\"  \n" +
-                                     "- **retainOrdering**  \n" +
-                                     "   Boolean denotes whether the Pulsar Sink consumes and processes messages in order  \n" +
-                                     "- **resources**  \n" +
-                                     "   {\"cpu\": 1, \"ram\": 2, \"disk\": 3} The CPU (in cores), RAM (in bytes) and disk (in bytes) that needs to be allocated per Pulsar Sink instance (applicable only to Docker runtime)  \n" +
-                                     "- **autoAck**  \n" +
-                                     "   Boolean denotes whether or not the framework will automatically acknowledge messages  \n" +
-                                     "- **timeoutMs**  \n" +
-                                     "   Long denotes the message timeout in milliseconds  \n" +
-                                     "- **cleanupSubscription**  \n" +
-                                     "   Boolean denotes whether the subscriptions the functions created/used should be deleted when the functions is deleted  \n" +
-                                     "- **runtimeFlags**  \n" +
-                                     "   Any flags that you want to pass to the runtime as a single string  \n",
-                                 examples = @Example(
-                                     value = @ExampleProperty(
-                                         mediaType = MediaType.APPLICATION_JSON,
-                                         value = "{  \n" +
-                                             "\t\"classname\": \"org.example.MySinkTest\",\n" +
-                                             "\t\"inputs\": [\"persistent://public/default/sink-input\"],\n" +
-                                             "\t\"processingGuarantees\": \"EFFECTIVELY_ONCE\",\n" +
-                                             "\t\"parallelism\": 10\n" +
-                                             "}"
-                                     )
+                             @ApiParam(value =
+                                     "A JSON value presenting config payload of a Pulsar Sink."
+                                             + " All available configuration options are:\n"
+                                             + "- **classname**\n"
+                                             + "   The class name of a Pulsar Sink if"
+                                             + " archive is file-url-path (file://)\n"
+                                             + "- **sourceSubscriptionName**\n"
+                                             + "   Pulsar source subscription name if"
+                                             + " user wants a specific\n"
+                                             + "   subscription-name for input-topic consumer\n"
+                                             + "- **inputs**\n"
+                                             + "   The input topic or topics of"
+                                             + " a Pulsar Sink (specified as a JSON array)\n"
+                                             + "- **topicsPattern**\n"
+                                             + "   TopicsPattern to consume from list of topics under a namespace that "
+                                             + "   match the pattern. [input] and [topicsPattern] are mutually "
+                                             + "   exclusive. Add SerDe class name for a pattern in customSerdeInputs "
+                                             + "   (supported for java fun only)"
+                                             + "- **topicToSerdeClassName**\n"
+                                             + "   The map of input topics to SerDe class names"
+                                             + " (specified as a JSON object)\n"
+                                             + "- **topicToSchemaType**\n"
+                                             + "   The map of input topics to Schema types or class names"
+                                             + " (specified as a JSON object)\n"
+                                             + "- **inputSpecs**\n"
+                                             + "   The map of input topics to its consumer configuration,"
+                                             + " each configuration has schema of "
+                                             + "   {\"schemaType\": \"type-x\", \"serdeClassName\": \"name-x\","
+                                             + " \"isRegexPattern\": true, \"receiverQueueSize\": 5}\n"
+                                             + "- **configs**\n"
+                                             + "   The map of configs (specified as a JSON object)\n"
+                                             + "- **secrets**\n"
+                                             + "   a map of secretName(aka how the secret is going to be \n"
+                                             + "   accessed in the function via context) to an object that \n"
+                                             + "   encapsulates how the secret is fetched by the underlying \n"
+                                             + "   secrets provider. The type of an value here can be found by the \n"
+                                             + "   SecretProviderConfigurator.getSecretObjectType() method."
+                                             + " (specified as a JSON object)\n"
+                                             + "- **parallelism**\n"
+                                             + "   The parallelism factor of a Pulsar Sink"
+                                             + " (i.e. the number of a Pulsar Sink instances to run \n"
+                                             + "- **processingGuarantees**\n"
+                                             + "   The processing guarantees (aka delivery semantics) applied to"
+                                             + " the Pulsar Sink. Possible Values: \"ATLEAST_ONCE\","
+                                             + " \"ATMOST_ONCE\", \"EFFECTIVELY_ONCE\"\n"
+                                             + "- **retainOrdering**\n"
+                                             + "   Boolean denotes whether the Pulsar Sink"
+                                             + " consumes and processes messages in order\n"
+                                             + "- **resources**\n"
+                                             + "   {\"cpu\": 1, \"ram\": 2, \"disk\": 3} The CPU (in cores),"
+                                             + " RAM (in bytes) and disk (in bytes) that needs to be "
+                                             + "allocated per Pulsar Sink instance "
+                                             + "(applicable only to Docker runtime)\n"
+                                             + "- **autoAck**\n"
+                                             + "   Boolean denotes whether or not the framework"
+                                             + " will automatically acknowledge messages\n"
+                                             + "- **timeoutMs**\n"
+                                             + "   Long denotes the message timeout in milliseconds\n"
+                                             + "- **cleanupSubscription**\n"
+                                             + "   Boolean denotes whether the subscriptions the functions"
+                                             + " created/used should be deleted when the functions is deleted\n"
+                                             + "- **runtimeFlags**\n"
+                                             + "   Any flags that you want to pass to the runtime as a single string\n",
+                                     examples = @Example(
+                                             value = @ExampleProperty(
+                                                     mediaType = MediaType.APPLICATION_JSON,
+                                                     value = "{\n"
+                                                             + "\t\"classname\": \"org.example.MySinkTest\",\n"
+                                                             + "\t\"inputs\": ["
+                                                             + "\"persistent://public/default/sink-input\"],\n"
+                                                             + "\t\"processingGuarantees\": \"EFFECTIVELY_ONCE\",\n"
+                                                             + "\t\"parallelism\": 10\n"
+                                                             + "}"
+                                             )
                                  )
                              )
                              final @FormDataParam("sinkConfig") SinkConfig sinkConfig) {
@@ -147,80 +166,96 @@ public class SinksBase extends AdminResource implements Supplier<WorkerService>
     @PUT
     @ApiOperation(value = "Updates a Pulsar Sink currently running in cluster mode")
     @ApiResponses(value = {
-            @ApiResponse(code = 400, message = "Invalid request (The Pulsar Sink doesn't exist, update contains no change, etc.)"),
+            @ApiResponse(code = 400, message =
+                    "Invalid request (The Pulsar Sink doesn't exist, update contains no change, etc.)"),
             @ApiResponse(code = 200, message = "Pulsar Sink successfully updated"),
             @ApiResponse(code = 401, message = "Client is not authorized to perform operation"),
             @ApiResponse(code = 404, message = "The Pulsar Sink doesn't exist"),
-            @ApiResponse(code = 500, message = "Internal server error (failed to authorize, failed to process package, etc.)"),
+            @ApiResponse(code = 500, message =
+                    "Internal server error (failed to authorize, failed to process package, etc.)"),
             @ApiResponse(code = 503, message = "Function worker service is now initializing. Please try again later.")
     })
     @Path("/{tenant}/{namespace}/{sinkName}")
     @Consumes(MediaType.MULTIPART_FORM_DATA)
-    public void updateSink(@ApiParam(value = "The tenant of a Pulsar Sink")
-                           final @PathParam("tenant") String tenant,
-                           @ApiParam(value = "The namespace of a Pulsar Sink")
-                           final @PathParam("namespace") String namespace,
-                           @ApiParam(value = "The name of a Pulsar Sink")
-                           final @PathParam("sinkName") String sinkName,
+    public void updateSink(@ApiParam(value = "The tenant of a Pulsar Sink") final @PathParam("tenant") String tenant,
+                           @ApiParam(value = "The namespace of a Pulsar Sink") final @PathParam("namespace")
+                                   String namespace,
+                           @ApiParam(value = "The name of a Pulsar Sink") final @PathParam("sinkName") String sinkName,
                            final @FormDataParam("data") InputStream uploadedInputStream,
                            final @FormDataParam("data") FormDataContentDisposition fileDetail,
                            final @FormDataParam("url") String sinkPkgUrl,
-                           @ApiParam(
-                               value =
-                                   "A JSON value presenting config payload of a Pulsar Sink. All available configuration options are:  \n" +
-                                       "- **classname**  \n" +
-                                       "   The class name of a Pulsar Sink if archive is file-url-path (file://)  \n" +
-                                       "- **sourceSubscriptionName**  \n" +
-                                       "   Pulsar source subscription name if user wants a specific  \n" +
-                                       "   subscription-name for input-topic consumer  \n" +
-                                       "- **inputs**  \n" +
-                                       "   The input topic or topics of a Pulsar Sink (specified as a JSON array)  \n" +
-                                       "- **topicsPattern**  \n" +
-                                       "   TopicsPattern to consume from list of topics under a namespace that " +
-                                       "   match the pattern. [input] and [topicsPattern] are mutually " +
-                                       "   exclusive. Add SerDe class name for a pattern in customSerdeInputs " +
-                                       "   (supported for java fun only)" +
-                                       "- **topicToSerdeClassName**  \n" +
-                                       "   The map of input topics to SerDe class names (specified as a JSON object)  \n" +
-                                       "- **topicToSchemaType**  \n" +
-                                       "   The map of input topics to Schema types or class names (specified as a JSON object)  \n" +
-                                       "- **inputSpecs**  \n" +
-                                       "   The map of input topics to its consumer configuration, each configuration has schema of " +
-                                       "   {\"schemaType\": \"type-x\", \"serdeClassName\": \"name-x\", \"isRegexPattern\": true, \"receiverQueueSize\": 5}  \n" +
-                                       "- **configs**  \n" +
-                                       "   The map of configs (specified as a JSON object)  \n" +
-                                       "- **secrets**  \n" +
-                                       "   a map of secretName(aka how the secret is going to be \n" +
-                                       "   accessed in the function via context) to an object that \n" +
-                                       "   encapsulates how the secret is fetched by the underlying \n" +
-                                       "   secrets provider. The type of an value here can be found by the \n" +
-                                       "   SecretProviderConfigurator.getSecretObjectType() method. (specified as a JSON object)  \n" +
-                                       "- **parallelism**  \n" +
-                                       "   The parallelism factor of a Pulsar Sink (i.e. the number of a Pulsar Sink instances to run \n" +
-                                       "- **processingGuarantees**  \n" +
-                                       "   The processing guarantees (aka delivery semantics) applied to the Pulsar Sink. Possible Values: \"ATLEAST_ONCE\", \"ATMOST_ONCE\", \"EFFECTIVELY_ONCE\"  \n" +
-                                       "- **retainOrdering**  \n" +
-                                       "   Boolean denotes whether the Pulsar Sink consumes and processes messages in order  \n" +
-                                       "- **resources**  \n" +
-                                       "   {\"cpu\": 1, \"ram\": 2, \"disk\": 3} The CPU (in cores), RAM (in bytes) and disk (in bytes) that needs to be allocated per Pulsar Sink instance (applicable only to Docker runtime)  \n" +
-                                       "- **autoAck**  \n" +
-                                       "   Boolean denotes whether or not the framework will automatically acknowledge messages  \n" +
-                                       "- **timeoutMs**  \n" +
-                                       "   Long denotes the message timeout in milliseconds  \n" +
-                                       "- **cleanupSubscription**  \n" +
-                                       "   Boolean denotes whether the subscriptions the functions created/used should be deleted when the functions is deleted  \n" +
-                                       "- **runtimeFlags**  \n" +
-                                       "   Any flags that you want to pass to the runtime as a single string  \n",
-                               examples = @Example(
-                                   value = @ExampleProperty(
-                                       mediaType = MediaType.APPLICATION_JSON,
-                                       value = "{  \n" +
-                                           "\t\"classname\": \"org.example.SinkStressTest\",  \n" +
-                                               "\t\"inputs\": [\"persistent://public/default/sink-input\"],\n" +
-                                               "\t\"processingGuarantees\": \"EFFECTIVELY_ONCE\",\n" +
-                                               "\t\"parallelism\": 5\n" +
-                                               "}"
-                                   )
+                           @ApiParam(value =
+                                   "A JSON value presenting config payload of a Pulsar Sink."
+                                           + " All available configuration options are:\n"
+                                           + "- **classname**\n"
+                                           + "   The class name of a Pulsar Sink if"
+                                           + " archive is file-url-path (file://)\n"
+                                           + "- **sourceSubscriptionName**\n"
+                                           + "   Pulsar source subscription name if user wants a specific\n"
+                                           + "   subscription-name for input-topic consumer\n"
+                                           + "- **inputs**\n"
+                                           + "   The input topic or topics of"
+                                           + " a Pulsar Sink (specified as a JSON array)\n"
+                                           + "- **topicsPattern**\n"
+                                           + "   TopicsPattern to consume from list of topics under a namespace that "
+                                           + "   match the pattern. [input] and [topicsPattern] are mutually "
+                                           + "   exclusive. Add SerDe class name for a pattern in customSerdeInputs "
+                                           + "   (supported for java fun only)"
+                                           + "- **topicToSerdeClassName**\n"
+                                           + "   The map of input topics to"
+                                           + " SerDe class names (specified as a JSON object)\n"
+                                           + "- **topicToSchemaType**\n"
+                                           + "   The map of input topics to Schema types or"
+                                           + " class names (specified as a JSON object)\n"
+                                           + "- **inputSpecs**\n"
+                                           + "   The map of input topics to its consumer configuration,"
+                                           + " each configuration has schema of "
+                                           + "   {\"schemaType\": \"type-x\", \"serdeClassName\": \"name-x\","
+                                           + " \"isRegexPattern\": true, \"receiverQueueSize\": 5}\n"
+                                           + "- **configs**\n"
+                                           + "   The map of configs (specified as a JSON object)\n"
+                                           + "- **secrets**\n"
+                                           + "   a map of secretName(aka how the secret is going to be \n"
+                                           + "   accessed in the function via context) to an object that \n"
+                                           + "   encapsulates how the secret is fetched by the underlying \n"
+                                           + "   secrets provider. The type of an value here can be found by the \n"
+                                           + "   SecretProviderConfigurator.getSecretObjectType() method."
+                                           + " (specified as a JSON object)\n"
+                                           + "- **parallelism**\n"
+                                           + "   The parallelism factor of a Pulsar Sink "
+                                           + "(i.e. the number of a Pulsar Sink instances to run \n"
+                                           + "- **processingGuarantees**\n"
+                                           + "   The processing guarantees (aka delivery semantics) applied to the"
+                                           + " Pulsar Sink. Possible Values: \"ATLEAST_ONCE\", \"ATMOST_ONCE\","
+                                           + " \"EFFECTIVELY_ONCE\"\n"
+                                           + "- **retainOrdering**\n"
+                                           + "   Boolean denotes whether the Pulsar Sink"
+                                           + " consumes and processes messages in order\n"
+                                           + "- **resources**\n"
+                                           + "   {\"cpu\": 1, \"ram\": 2, \"disk\": 3} The CPU (in cores),"
+                                           + " RAM (in bytes) and disk (in bytes) that needs to be allocated per"
+                                           + " Pulsar Sink instance (applicable only to Docker runtime)\n"
+                                           + "- **autoAck**\n"
+                                           + "   Boolean denotes whether or not the framework will"
+                                           + " automatically acknowledge messages\n"
+                                           + "- **timeoutMs**\n"
+                                           + "   Long denotes the message timeout in milliseconds\n"
+                                           + "- **cleanupSubscription**\n"
+                                           + "   Boolean denotes whether the subscriptions the functions"
+                                           + " created/used should be deleted when the functions is deleted\n"
+                                           + "- **runtimeFlags**\n"
+                                           + "   Any flags that you want to pass to the runtime as a single string\n",
+                                   examples = @Example(
+                                           value = @ExampleProperty(
+                                                   mediaType = MediaType.APPLICATION_JSON,
+                                                   value = "{\n"
+                                                           + "\t\"classname\": \"org.example.SinkStressTest\",\n"
+                                                           + "\t\"inputs\": ["
+                                                           + "\"persistent://public/default/sink-input\"],\n"
+                                                           + "\t\"processingGuarantees\": \"EFFECTIVELY_ONCE\",\n"
+                                                           + "\t\"parallelism\": 5\n"
+                                                           + "}"
+                                           )
                                )
                            )
                            final @FormDataParam("sinkConfig") SinkConfig sinkConfig,
@@ -239,7 +274,8 @@ public class SinksBase extends AdminResource implements Supplier<WorkerService>
             @ApiResponse(code = 404, message = "The Pulsar Sink does not exist"),
             @ApiResponse(code = 200, message = "The Pulsar Sink was successfully deleted"),
             @ApiResponse(code = 401, message = "Client is not authorized to perform operation"),
-            @ApiResponse(code = 500, message = "Internal server error (failed to authorize, failed to deregister, etc.)"),
+            @ApiResponse(code = 500, message =
+                    "Internal server error (failed to authorize, failed to deregister, etc.)"),
             @ApiResponse(code = 408, message = "Got InterruptedException while deregistering the Pulsar Sink"),
             @ApiResponse(code = 503, message = "Function worker service is now initializing. Please try again later.")
     })
@@ -350,7 +386,9 @@ public class SinksBase extends AdminResource implements Supplier<WorkerService>
             @ApiResponse(code = 400, message = "Invalid restart request"),
             @ApiResponse(code = 401, message = "The client is not authorized to perform this operation"),
             @ApiResponse(code = 404, message = "The Pulsar Sink does not exist"),
-            @ApiResponse(code = 500, message = "Internal server error (failed to restart the instance of a Pulsar Sink, failed to authorize, etc.)"),
+            @ApiResponse(code = 500, message =
+                    "Internal server error (failed to restart the instance of"
+                            + " a Pulsar Sink, failed to authorize, etc.)"),
             @ApiResponse(code = 503, message = "Function worker service is now initializing. Please try again later.")
     })
     @Path("/{tenant}/{namespace}/{sinkName}/{instanceId}/restart")
@@ -363,7 +401,8 @@ public class SinksBase extends AdminResource implements Supplier<WorkerService>
                             final @PathParam("sinkName") String sinkName,
                             @ApiParam(value = "The instanceId of a Pulsar Sink")
                             final @PathParam("instanceId") String instanceId) {
-        sink.restartFunctionInstance(tenant, namespace, sinkName, instanceId, uri.getRequestUri(), clientAppId(), clientAuthData());
+        sink.restartFunctionInstance(tenant, namespace, sinkName, instanceId,
+                uri.getRequestUri(), clientAppId(), clientAuthData());
     }
 
     @POST
@@ -372,7 +411,8 @@ public class SinksBase extends AdminResource implements Supplier<WorkerService>
             @ApiResponse(code = 400, message = "Invalid restart request"),
             @ApiResponse(code = 401, message = "The client is not authorized to perform this operation"),
             @ApiResponse(code = 404, message = "The Pulsar Sink does not exist"),
-            @ApiResponse(code = 500, message = "Internal server error (failed to restart the Pulsar Sink, failed to authorize, etc.)"),
+            @ApiResponse(code = 500, message =
+                    "Internal server error (failed to restart the Pulsar Sink, failed to authorize, etc.)"),
             @ApiResponse(code = 503, message = "Function worker service is now initializing. Please try again later.")
     })
     @Path("/{tenant}/{namespace}/{sinkName}/restart")
@@ -391,7 +431,8 @@ public class SinksBase extends AdminResource implements Supplier<WorkerService>
     @ApiResponses(value = {
             @ApiResponse(code = 400, message = "Invalid stop request"),
             @ApiResponse(code = 404, message = "The Pulsar Sink instance does not exist"),
-            @ApiResponse(code = 500, message = "Internal server error (failed to stop the Pulsar Sink, failed to authorize, etc.)"),
+            @ApiResponse(code = 500, message =
+                    "Internal server error (failed to stop the Pulsar Sink, failed to authorize, etc.)"),
             @ApiResponse(code = 401, message = "The client is not authorized to perform this operation"),
             @ApiResponse(code = 503, message = "Function worker service is now initializing. Please try again later.")
     })
@@ -405,7 +446,8 @@ public class SinksBase extends AdminResource implements Supplier<WorkerService>
                          final @PathParam("sinkName") String sinkName,
                          @ApiParam(value = "The instanceId of a Pulsar Sink")
                          final @PathParam("instanceId") String instanceId) {
-        sink.stopFunctionInstance(tenant, namespace, sinkName, instanceId, uri.getRequestUri(), clientAppId(), clientAuthData());
+        sink.stopFunctionInstance(tenant, namespace,
+                sinkName, instanceId, uri.getRequestUri(), clientAppId(), clientAuthData());
     }
 
     @POST
@@ -413,7 +455,8 @@ public class SinksBase extends AdminResource implements Supplier<WorkerService>
     @ApiResponses(value = {
             @ApiResponse(code = 400, message = "Invalid stop request"),
             @ApiResponse(code = 404, message = "The Pulsar Sink does not exist"),
-            @ApiResponse(code = 500, message = "Internal server error (failed to stop the Pulsar Sink, failed to authorize, etc.)"),
+            @ApiResponse(code = 500, message =
+                    "Internal server error (failed to stop the Pulsar Sink, failed to authorize, etc.)"),
             @ApiResponse(code = 401, message = "The client is not authorized to perform this operation"),
             @ApiResponse(code = 503, message = "Function worker service is now initializing. Please try again later.")
     })
@@ -433,7 +476,8 @@ public class SinksBase extends AdminResource implements Supplier<WorkerService>
     @ApiResponses(value = {
             @ApiResponse(code = 400, message = "Invalid start request"),
             @ApiResponse(code = 404, message = "The Pulsar Sink does not exist"),
-            @ApiResponse(code = 500, message = "Internal server error (failed to start the Pulsar Sink, failed to authorize, etc.)"),
+            @ApiResponse(code = 500, message =
+                    "Internal server error (failed to start the Pulsar Sink, failed to authorize, etc.)"),
             @ApiResponse(code = 401, message = "The client is not authorized to perform this operation"),
             @ApiResponse(code = 503, message = "Function worker service is now initializing. Please try again later.")
     })
@@ -447,7 +491,8 @@ public class SinksBase extends AdminResource implements Supplier<WorkerService>
                           final @PathParam("sinkName") String sinkName,
                           @ApiParam(value = "The instanceId of a Pulsar Sink")
                           final @PathParam("instanceId") String instanceId) {
-        sink.startFunctionInstance(tenant, namespace, sinkName, instanceId, uri.getRequestUri(), clientAppId(), clientAuthData());
+        sink.startFunctionInstance(tenant, namespace, sinkName, instanceId,
+                uri.getRequestUri(), clientAppId(), clientAuthData());
     }
 
     @POST
@@ -455,7 +500,8 @@ public class SinksBase extends AdminResource implements Supplier<WorkerService>
     @ApiResponses(value = {
             @ApiResponse(code = 400, message = "Invalid start request"),
             @ApiResponse(code = 404, message = "The Pulsar Sink does not exist"),
-            @ApiResponse(code = 500, message = "Internal server error (failed to start the Pulsar Sink, failed to authorize, etc.)"),
+            @ApiResponse(code = 500, message =
+                    "Internal server error (failed to start the Pulsar Sink, failed to authorize, etc.)"),
             @ApiResponse(code = 401, message = "The client is not authorized to perform this operation"),
             @ApiResponse(code = 503, message = "Function worker service is now initializing. Please try again later.")
     })
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SourcesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SourcesBase.java
index 771bdd2..cdd1595 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SourcesBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SourcesBase.java
@@ -65,7 +65,8 @@ public class SourcesBase extends AdminResource implements Supplier<WorkerService
     @ApiOperation(value = "Creates a new Pulsar Source in cluster mode")
     @ApiResponses(value = {
             @ApiResponse(code = 200, message = "Pulsar Function successfully created"),
-            @ApiResponse(code = 400, message = "Invalid request (Function already exists or Tenant, Namespace or Name is not provided, etc.)"),
+            @ApiResponse(code = 400, message =
+                    "Invalid request (Function already exists or Tenant, Namespace or Name is not provided, etc.)"),
             @ApiResponse(code = 401, message = "Client is not authorize to perform operation"),
             @ApiResponse(code = 500, message = "Internal Server Error"),
             @ApiResponse(code = 503, message = "Function worker service is now initializing. Please try again later.")
@@ -84,35 +85,41 @@ public class SourcesBase extends AdminResource implements Supplier<WorkerService
             final @FormDataParam("data") FormDataContentDisposition fileDetail,
             final @FormDataParam("url") String sourcePkgUrl,
             @ApiParam(
-                    value = "A JSON value presenting configuration payload of a Pulsar Source. An example of the expected functions can be found here.  \n" +
-                            "- **classname**  \n" +
-                            "  The class name of a Pulsar Source if archive is file-url-path (file://).  \n" +
-                            "- **topicName**  \n" +
-                            "  The Pulsar topic to which data is sent.  \n" +
-                            "- **serdeClassName**  \n" +
-                            "  The SerDe classname for the Pulsar Source.  \n" +
-                            "- **schemaType**  \n" +
-                            "  The schema type (either a builtin schema like 'avro', 'json', etc.. or  " +
-                            "  custom Schema class name to be used to encode messages emitted from the Pulsar Source  \n" +
-                            "- **configs**  \n" +
-                            "  Source config key/values  \n" +
-                            "- **secrets**  \n" +
-                            "  This is a map of secretName(that is how the secret is going to be accessed in the function via context) to an object that" +
-                            "  encapsulates how the secret is fetched by the underlying secrets provider. The type of an value here can be found by the" +
-                            "  SecretProviderConfigurator.getSecretObjectType() method. \n" +
-                            "- **parallelism**  \n" +
-                            "  The parallelism factor of a Pulsar Source (i.e. the number of a Pulsar Source instances to run).  \n" +
-                            "- **processingGuarantees**  \n" +
-                            "  The processing guarantees (aka delivery semantics) applied to the Pulsar Source.  " +
-                            "  Possible Values: [ATLEAST_ONCE, ATMOST_ONCE, EFFECTIVELY_ONCE]  \n" +
-                            "- **resources**  \n" +
-                            "  The size of the system resources allowed by the Pulsar Source runtime. The resources include: cpu, ram, disk.  \n" +
-                            "- **archive**  \n" +
-                            "  The path to the NAR archive for the Pulsar Source. It also supports url-path " +
-                            "  [http/https/file (file protocol assumes that file already exists on worker host)] " +
-                            "  from which worker can download the package.  \n" +
-                            "- **runtimeFlags**  \n" +
-                            "  Any flags that you want to pass to the runtime.  \n",
+                    value = "A JSON value presenting configuration payload of a Pulsar Source."
+                            + " An example of the expected functions can be found here.\n"
+                            + "- **classname**\n"
+                            + "  The class name of a Pulsar Source if archive is file-url-path (file://).\n"
+                            + "- **topicName**\n"
+                            + "  The Pulsar topic to which data is sent.\n"
+                            + "- **serdeClassName**\n"
+                            + "  The SerDe classname for the Pulsar Source.\n"
+                            + "- **schemaType**\n"
+                            + "  The schema type (either a builtin schema like 'avro', 'json', etc.. or  "
+                            + "  custom Schema class name to be used to"
+                            + " encode messages emitted from the Pulsar Source\n"
+                            + "- **configs**\n"
+                            + "  Source config key/values\n"
+                            + "- **secrets**\n"
+                            + "  This is a map of secretName(that is how the secret is going"
+                            + " to be accessed in the function via context) to an object that"
+                            + "  encapsulates how the secret is fetched by the underlying secrets provider."
+                            + " The type of an value here can be found by the"
+                            + "  SecretProviderConfigurator.getSecretObjectType() method. \n"
+                            + "- **parallelism**\n"
+                            + "  The parallelism factor of a Pulsar Source"
+                            + " (i.e. the number of a Pulsar Source instances to run).\n"
+                            + "- **processingGuarantees**\n"
+                            + "  The processing guarantees (aka delivery semantics) applied to the Pulsar Source.  "
+                            + "  Possible Values: [ATLEAST_ONCE, ATMOST_ONCE, EFFECTIVELY_ONCE]\n"
+                            + "- **resources**\n"
+                            + "  The size of the system resources allowed by the Pulsar Source runtime."
+                            + " The resources include: cpu, ram, disk.\n"
+                            + "- **archive**\n"
+                            + "  The path to the NAR archive for the Pulsar Source. It also supports url-path "
+                            + "  [http/https/file (file protocol assumes that file already exists on worker host)] "
+                            + "  from which worker can download the package.\n"
+                            + "- **runtimeFlags**\n"
+                            + "  Any flags that you want to pass to the runtime.\n",
                     examples = @Example(
                             value = @ExampleProperty(
                                     mediaType = MediaType.APPLICATION_JSON,
@@ -138,7 +145,8 @@ public class SourcesBase extends AdminResource implements Supplier<WorkerService
     @ApiOperation(value = "Updates a Pulsar Source currently running in cluster mode")
     @ApiResponses(value = {
             @ApiResponse(code = 403, message = "The requester doesn't have admin permissions"),
-            @ApiResponse(code = 400, message = "Invalid request (Function already exists or Tenant, Namespace or Name is not provided, etc.)"),
+            @ApiResponse(code = 400, message =
+                    "Invalid request (Function already exists or Tenant, Namespace or Name is not provided, etc.)"),
             @ApiResponse(code = 401, message = "Client is not authorize to perform operation"),
             @ApiResponse(code = 200, message = "Pulsar Function successfully updated"),
             @ApiResponse(code = 404, message = "Not Found(The Pulsar Source doesn't exist)"),
@@ -158,35 +166,41 @@ public class SourcesBase extends AdminResource implements Supplier<WorkerService
             final @FormDataParam("data") FormDataContentDisposition fileDetail,
             final @FormDataParam("url") String sourcePkgUrl,
             @ApiParam(
-                    value = "A JSON value presenting configuration payload of a Pulsar Source. An example of the expected functions can be found here.  \n" +
-                            "- **classname**  \n" +
-                            "  The class name of a Pulsar Source if archive is file-url-path (file://).  \n" +
-                            "- **topicName**  \n" +
-                            "  The Pulsar topic to which data is sent.  \n" +
-                            "- **serdeClassName**  \n" +
-                            "  The SerDe classname for the Pulsar Source.  \n" +
-                            "- **schemaType**  \n" +
-                            "  The schema type (either a builtin schema like 'avro', 'json', etc.. or  " +
-                            "  custom Schema class name to be used to encode messages emitted from the Pulsar Source  \n" +
-                            "- **configs**  \n" +
-                            "  Pulsar Source config key/values  \n" +
-                            "- **secrets**  \n" +
-                            "  This is a map of secretName(that is how the secret is going to be accessed in the function via context) to an object that" +
-                            "  encapsulates how the secret is fetched by the underlying secrets provider. The type of an value here can be found by the" +
-                            "  SecretProviderConfigurator.getSecretObjectType() method. \n" +
-                            "- **parallelism**  \n" +
-                            "  The parallelism factor of a Pulsar Source (i.e. the number of a Pulsar Source instances to run).  \n" +
-                            "- **processingGuarantees**  \n" +
-                            "  The processing guarantees (aka delivery semantics) applied to the Pulsar Source.  " +
-                            "  Possible Values: [ATLEAST_ONCE, ATMOST_ONCE, EFFECTIVELY_ONCE]  \n" +
-                            "- **resources**  \n" +
-                            "  The size of the system resources allowed by the Pulsar Source runtime. The resources include: cpu, ram, disk.  \n" +
-                            "- **archive**  \n" +
-                            "  The path to the NAR archive for the Pulsar Source. It also supports url-path " +
-                            "  [http/https/file (file protocol assumes that file already exists on worker host)] " +
-                            "  from which worker can download the package.  \n" +
-                            "- **runtimeFlags**  \n" +
-                            "  Any flags that you want to pass to the runtime.  \n",
+                    value = "A JSON value presenting configuration payload of a Pulsar Source."
+                            + " An example of the expected functions can be found here.\n"
+                            + "- **classname**\n"
+                            + "  The class name of a Pulsar Source if archive is file-url-path (file://).\n"
+                            + "- **topicName**\n"
+                            + "  The Pulsar topic to which data is sent.\n"
+                            + "- **serdeClassName**\n"
+                            + "  The SerDe classname for the Pulsar Source.\n"
+                            + "- **schemaType**\n"
+                            + "  The schema type (either a builtin schema like 'avro', 'json', etc.. or  "
+                            + "  custom Schema class name to be used to encode"
+                            + " messages emitted from the Pulsar Source\n"
+                            + "- **configs**\n"
+                            + "  Pulsar Source config key/values\n"
+                            + "- **secrets**\n"
+                            + "  This is a map of secretName(that is how the secret is going to"
+                            + " be accessed in the function via context) to an object that"
+                            + "  encapsulates how the secret is fetched by the underlying secrets provider."
+                            + " The type of an value here can be found by the"
+                            + "  SecretProviderConfigurator.getSecretObjectType() method.\n"
+                            + "- **parallelism**\n"
+                            + "  The parallelism factor of a Pulsar Source"
+                            + " (i.e. the number of a Pulsar Source instances to run).\n"
+                            + "- **processingGuarantees**\n"
+                            + "  The processing guarantees (aka delivery semantics) applied to the Pulsar Source.  "
+                            + "  Possible Values: [ATLEAST_ONCE, ATMOST_ONCE, EFFECTIVELY_ONCE]\n"
+                            + "- **resources**\n"
+                            + "  The size of the system resources allowed by the Pulsar Source runtime."
+                            + " The resources include: cpu, ram, disk.\n"
+                            + "- **archive**\n"
+                            + "  The path to the NAR archive for the Pulsar Source. It also supports url-path "
+                            + "  [http/https/file (file protocol assumes that file already exists on worker host)] "
+                            + "  from which worker can download the package.\n"
+                            + "- **runtimeFlags**\n"
+                            + "  Any flags that you want to pass to the runtime.\n",
                     examples = @Example(
                             value = @ExampleProperty(
                                     mediaType = MediaType.APPLICATION_JSON,
@@ -267,14 +281,12 @@ public class SourcesBase extends AdminResource implements Supplier<WorkerService
     @Produces(MediaType.APPLICATION_JSON)
     @Path("/{tenant}/{namespace}/{sourceName}/{instanceId}/status")
     public SourceStatus.SourceInstanceStatus.SourceInstanceStatusData getSourceInstanceStatus(
-            @ApiParam(value = "The tenant of a Pulsar Source")
-            final @PathParam("tenant") String tenant,
-            @ApiParam(value = "The namespace of a Pulsar Source")
-            final @PathParam("namespace") String namespace,
-            @ApiParam(value = "The name of a Pulsar Source")
-            final @PathParam("sourceName") String sourceName,
-            @ApiParam(value = "The instanceId of a Pulsar Source (if instance-id is not provided, the stats of all instances is returned).")
-            final @PathParam("instanceId") String instanceId) throws IOException {
+            @ApiParam(value = "The tenant of a Pulsar Source") final @PathParam("tenant") String tenant,
+            @ApiParam(value = "The namespace of a Pulsar Source") final @PathParam("namespace") String namespace,
+            @ApiParam(value = "The name of a Pulsar Source") final @PathParam("sourceName") String sourceName,
+            @ApiParam(value = "The instanceId of a Pulsar Source"
+                    + " (if instance-id is not provided, the stats of all instances is returned).") final @PathParam(
+                    "instanceId") String instanceId) throws IOException {
         return source.getSourceInstanceStatus(
             tenant, namespace, sourceName, instanceId, uri.getRequestUri(), clientAppId(), clientAuthData());
     }
@@ -298,7 +310,8 @@ public class SourcesBase extends AdminResource implements Supplier<WorkerService
             final @PathParam("namespace") String namespace,
             @ApiParam(value = "The name of a Pulsar Source")
             final @PathParam("sourceName") String sourceName) throws IOException {
-        return source.getSourceStatus(tenant, namespace, sourceName, uri.getRequestUri(), clientAppId(), clientAuthData());
+        return source.getSourceStatus(tenant, namespace, sourceName, uri.getRequestUri(), clientAppId(),
+                clientAuthData());
     }
 
     @GET
@@ -336,15 +349,14 @@ public class SourcesBase extends AdminResource implements Supplier<WorkerService
     @Path("/{tenant}/{namespace}/{sourceName}/{instanceId}/restart")
     @Consumes(MediaType.APPLICATION_JSON)
     public void restartSource(
-            @ApiParam(value = "The tenant of a Pulsar Source")
-            final @PathParam("tenant") String tenant,
-            @ApiParam(value = "The namespace of a Pulsar Source")
-            final @PathParam("namespace") String namespace,
-            @ApiParam(value = "The name of a Pulsar Source")
-            final @PathParam("sourceName") String sourceName,
-            @ApiParam(value = "The instanceId of a Pulsar Source (if instance-id is not provided, the stats of all instances is returned).")
-            final @PathParam("instanceId") String instanceId) {
-        source.restartFunctionInstance(tenant, namespace, sourceName, instanceId, uri.getRequestUri(), clientAppId(), clientAuthData());
+            @ApiParam(value = "The tenant of a Pulsar Source") final @PathParam("tenant") String tenant,
+            @ApiParam(value = "The namespace of a Pulsar Source") final @PathParam("namespace") String namespace,
+            @ApiParam(value = "The name of a Pulsar Source") final @PathParam("sourceName") String sourceName,
+            @ApiParam(value = "The instanceId of a Pulsar Source"
+                    + " (if instance-id is not provided, the stats of all instances is returned).") final @PathParam(
+                    "instanceId") String instanceId) {
+        source.restartFunctionInstance(tenant, namespace, sourceName, instanceId,
+                uri.getRequestUri(), clientAppId(), clientAuthData());
     }
 
     @POST
@@ -380,15 +392,13 @@ public class SourcesBase extends AdminResource implements Supplier<WorkerService
     @Path("/{tenant}/{namespace}/{sourceName}/{instanceId}/stop")
     @Consumes(MediaType.APPLICATION_JSON)
     public void stopSource(
-            @ApiParam(value = "The tenant of a Pulsar Source")
-            final @PathParam("tenant") String tenant,
-            @ApiParam(value = "The namespace of a Pulsar Source")
-            final @PathParam("namespace") String namespace,
-            @ApiParam(value = "The name of a Pulsar Source")
-            final @PathParam("sourceName") String sourceName,
-            @ApiParam(value = "The instanceId of a Pulsar Source (if instance-id is not provided, the stats of all instances is returned).")
-            final @PathParam("instanceId") String instanceId) {
-        source.stopFunctionInstance(tenant, namespace, sourceName, instanceId, uri.getRequestUri(), clientAppId(), clientAuthData());
+            @ApiParam(value = "The tenant of a Pulsar Source") final @PathParam("tenant") String tenant,
+            @ApiParam(value = "The namespace of a Pulsar Source") final @PathParam("namespace") String namespace,
+            @ApiParam(value = "The name of a Pulsar Source") final @PathParam("sourceName") String sourceName,
+            @ApiParam(value = "The instanceId of a Pulsar Source (if instance-id is not provided,"
+                    + " the stats of all instances is returned).") final @PathParam("instanceId") String instanceId) {
+        source.stopFunctionInstance(tenant, namespace, sourceName, instanceId,
+                uri.getRequestUri(), clientAppId(), clientAuthData());
     }
 
     @POST
@@ -424,15 +434,13 @@ public class SourcesBase extends AdminResource implements Supplier<WorkerService
     @Path("/{tenant}/{namespace}/{sourceName}/{instanceId}/start")
     @Consumes(MediaType.APPLICATION_JSON)
     public void startSource(
-            @ApiParam(value = "The tenant of a Pulsar Source")
-            final @PathParam("tenant") String tenant,
-            @ApiParam(value = "The namespace of a Pulsar Source")
-            final @PathParam("namespace") String namespace,
-            @ApiParam(value = "The name of a Pulsar Source")
-            final @PathParam("sourceName") String sourceName,
-            @ApiParam(value = "The instanceId of a Pulsar Source (if instance-id is not provided, the stats of all instances is returned).")
-            final @PathParam("instanceId") String instanceId) {
-        source.startFunctionInstance(tenant, namespace, sourceName, instanceId, uri.getRequestUri(), clientAppId(), clientAuthData());
+            @ApiParam(value = "The tenant of a Pulsar Source") final @PathParam("tenant") String tenant,
+            @ApiParam(value = "The namespace of a Pulsar Source") final @PathParam("namespace") String namespace,
+            @ApiParam(value = "The name of a Pulsar Source") final @PathParam("sourceName") String sourceName,
+            @ApiParam(value = "The instanceId of a Pulsar Source (if instance-id is not provided,"
+                    + " the stats of all instances is returned).") final @PathParam("instanceId") String instanceId) {
+        source.startFunctionInstance(tenant, namespace, sourceName, instanceId,
+                uri.getRequestUri(), clientAppId(), clientAuthData());
     }
 
     @POST
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TenantsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TenantsBase.java
index 5b0b52d..e1c270d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TenantsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TenantsBase.java
@@ -127,7 +127,8 @@ public class TenantsBase extends AdminResource {
 
     @POST
     @Path("/{tenant}")
-    @ApiOperation(value = "Update the admins for a tenant.", notes = "This operation requires Pulsar super-user privileges.")
+    @ApiOperation(value = "Update the admins for a tenant.",
+            notes = "This operation requires Pulsar super-user privileges.")
     @ApiResponses(value = { @ApiResponse(code = 403, message = "The requester doesn't have admin permissions"),
             @ApiResponse(code = 404, message = "Tenant does not exist"),
             @ApiResponse(code = 409, message = "Tenant already exists"),
@@ -168,7 +169,8 @@ public class TenantsBase extends AdminResource {
                 if (!clustersWithActiveNamespaces.isEmpty()) {
                     // Throw an exception because colos being removed are having active namespaces
                     String msg = String.format(
-                            "Failed to update the tenant because active namespaces are present in colos %s. Please delete those namespaces first",
+                            "Failed to update the tenant because active namespaces are present in colos %s."
+                                    + " Please delete those namespaces first",
                             clustersWithActiveNamespaces);
                     throw new RestException(Status.CONFLICT, msg);
                 }
@@ -233,8 +235,9 @@ public class TenantsBase extends AdminResource {
 
     private void validateClusters(TenantInfo info) {
         // empty cluster shouldn't be allowed
-        if (info == null || info.getAllowedClusters().stream().filter(c -> !StringUtils.isBlank(c)).collect(Collectors.toSet()).isEmpty()
-            || info.getAllowedClusters().stream().anyMatch(ac -> StringUtils.isBlank(ac))) {
+        if (info == null || info.getAllowedClusters().stream()
+                .filter(c -> !StringUtils.isBlank(c)).collect(Collectors.toSet()).isEmpty()
+                || info.getAllowedClusters().stream().anyMatch(ac -> StringUtils.isBlank(ac))) {
             log.warn("[{}] Failed to validate due to clusters are empty", clientAppId());
             throw new RestException(Status.PRECONDITION_FAILED, "Clusters can not be empty");
         }