You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2018/12/20 10:22:34 UTC

[pulsar] branch master updated: [conf] Add annotations for documenting broker configuration settings (#3113)

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 9c3f2a7   [conf] Add annotations for documenting broker configuration settings (#3113)
9c3f2a7 is described below

commit 9c3f2a77bb352c073d2dc36094b42dc3c9918c3b
Author: Sijie Guo <gu...@gmail.com>
AuthorDate: Thu Dec 20 18:22:29 2018 +0800

     [conf] Add annotations for documenting broker configuration settings (#3113)
    
    *Motivation*
    
    This change is adding annotations to broker configuration for generating broker configuration file.
---
 .../apache/pulsar/broker/ServiceConfiguration.java | 1036 +++++++++++++++-----
 .../configuration/PulsarConfigurationLoader.java   |    7 +-
 2 files changed, 784 insertions(+), 259 deletions(-)

diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 3939ceb..75ee5d2 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -30,6 +30,7 @@ import lombok.Getter;
 import lombok.Setter;
 import org.apache.bookkeeper.client.api.DigestType;
 import org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider;
+import org.apache.pulsar.common.configuration.Category;
 import org.apache.pulsar.common.configuration.FieldContext;
 import org.apache.pulsar.common.configuration.PulsarConfiguration;
 import org.apache.pulsar.common.policies.data.BacklogQuota;
@@ -41,484 +42,996 @@ import org.apache.pulsar.common.policies.data.BacklogQuota;
 @Setter
 public class ServiceConfiguration implements PulsarConfiguration {
 
+    @Category
+    private static final String CATEGORY_SERVER = "Server";
+    @Category
+    private static final String CATEGORY_STORAGE_BK = "Storage (BookKeeper)";
+    @Category
+    private static final String CATEGORY_STORAGE_ML = "Storage (Managed Ledger)";
+    @Category
+    private static final String CATEGORY_STORAGE_OFFLOADING = "Storage (Ledger Offloading)";
+    @Category
+    private static final String CATEGORY_POLICIES = "Policies";
+    @Category
+    private static final String CATEGORY_WEBSOCKET = "WebSocket";
+    @Category
+    private static final String CATEGORY_SCHEMA = "Schema";
+    @Category
+    private static final String CATEGORY_METRICS = "Metrics";
+    @Category
+    private static final String CATEGORY_REPLICATION = "Replication";
+    @Category
+    private static final String CATEGORY_LOAD_BALANCER = "Load Balancer";
+    @Category
+    private static final String CATEGORY_FUNCTIONS = "Functions";
+    @Category
+    private static final String CATEGORY_TLS = "TLS";
+    @Category
+    private static final String CATEGORY_AUTHENTICATION = "Authentication";
+    @Category
+    private static final String CATEGORY_AUTHORIZATION = "Authorization";
+    @Category
+    private static final String CATEGORY_TOKEN_AUTH = "Token Authentication Provider";
+    @Category
+    private static final String CATEGORY_HTTP = "HTTP";
+
     /***** --- pulsar configuration --- ****/
-    // Zookeeper quorum connection string
-    @FieldContext(required = true)
+    @FieldContext(
+        category = CATEGORY_SERVER,
+        required = true,
+        doc = "The Zookeeper quorum connection string (as a comma-separated list)"
+    )
     private String zookeeperServers;
-    // Global Zookeeper quorum connection string
     @Deprecated
-    @FieldContext(required = false)
+    @FieldContext(
+        category = CATEGORY_SERVER,
+        required = false,
+        deprecated = true,
+        doc = "Global Zookeeper quorum connection string (as a comma-separated list)."
+            + " Deprecated in favor of using `configurationStoreServers`"
+    )
     private String globalZookeeperServers;
-    // Configuration Store connection string
-    @FieldContext(required = false)
+    @FieldContext(
+        category = CATEGORY_SERVER,
+        required = false,
+        doc = "Configuration store connection string (as a comma-separated list)"
+    )
     private String configurationStoreServers;
+    @FieldContext(
+        category = CATEGORY_SERVER,
+        doc = "The port for serving binary protobuf requests"
+    )
     private Integer brokerServicePort = 6650;
+    @FieldContext(
+        category = CATEGORY_SERVER,
+        doc = "The port for serving tls secured binary protobuf requests"
+    )
     private Integer brokerServicePortTls = null;
-    // Port to use to server HTTP request
+    @FieldContext(
+        category = CATEGORY_SERVER,
+        doc = "The port for serving http requests"
+    )
     private Integer webServicePort = 8080;
-    // Port to use to server HTTPS request
+    @FieldContext(
+        category = CATEGORY_SERVER,
+        doc = "The port for serving https requests"
+    )
     private Integer webServicePortTls = null;
 
-    // Hostname or IP address the service binds on.
+    @FieldContext(
+        category = CATEGORY_SERVER,
+        doc = "Hostname or IP address the service binds on"
+    )
     private String bindAddress = "0.0.0.0";
 
-    // Controls which hostname is advertised to the discovery service via ZooKeeper.
+    @FieldContext(
+        category = CATEGORY_SERVER,
+        doc = "Hostname or IP address the service advertises to the outside world."
+            + " If not set, the value of `InetAddress.getLocalHost().getHostname()` is used."
+    )
     private String advertisedAddress;
 
-    // Number of threads to use for Netty IO
+    @FieldContext(
+        category = CATEGORY_SERVER,
+        doc = "Number of threads to use for Netty IO."
+            + " Default is set to `2 * Runtime.getRuntime().availableProcessors()`"
+    )
     private int numIOThreads = 2 * Runtime.getRuntime().availableProcessors();
 
-    // Enable the WebSocket API service
+    @FieldContext(
+        category = CATEGORY_WEBSOCKET,
+        doc = "Enable the WebSocket API service in broker"
+    )
     private boolean webSocketServiceEnabled = false;
 
-    // Flag to control features that are meant to be used when running in standalone mode
+    @FieldContext(
+        category = CATEGORY_WEBSOCKET,
+        doc = "Flag indicates whether to run broker in standalone mode"
+    )
     private boolean isRunningStandalone = false;
 
-    // Name of the cluster to which this broker belongs to
-    @FieldContext(required = true)
+    @FieldContext(
+        category = CATEGORY_SERVER,
+        required = true,
+        doc = "Name of the cluster to which this broker belongs to"
+    )
     private String clusterName;
-    // Enable cluster's failure-domain which can distribute brokers into logical region
-    @FieldContext(dynamic = true)
+    @FieldContext(
+        category = CATEGORY_SERVER,
+        dynamic = true,
+        doc = "Enable cluster's failure-domain which can distribute brokers into logical region"
+    )
     private boolean failureDomainsEnabled = false;
-    // Zookeeper session timeout in milliseconds
+    @FieldContext(
+        category = CATEGORY_SERVER,
+        doc = "ZooKeeper session timeout in milliseconds"
+    )
     private long zooKeeperSessionTimeoutMillis = 30000;
-    // Time to wait for broker graceful shutdown. After this time elapses, the
-    // process will be killed
-    @FieldContext(dynamic = true)
+    @FieldContext(
+        category = CATEGORY_SERVER,
+        dynamic = true,
+        doc = "Time to wait for broker graceful shutdown. After this time elapses, the process will be killed"
+    )
     private long brokerShutdownTimeoutMs = 60000;
-    // Enable backlog quota check. Enforces action on topic when the quota is
-    // reached
+
+    @FieldContext(
+        category = CATEGORY_POLICIES,
+        doc = "Enable backlog quota check. Enforces actions on topic when the quota is reached"
+    )
     private boolean backlogQuotaCheckEnabled = true;
-    // How often to check for topics that have reached the quota
+    @FieldContext(
+        category = CATEGORY_POLICIES,
+        doc = "How often to check for topics that have reached the quota."
+            + " It only takes effects when `backlogQuotaCheckEnabled` is true"
+    )
     private int backlogQuotaCheckIntervalInSeconds = 60;
-    // Default per-topic backlog quota limit
+    @FieldContext(
+        category = CATEGORY_POLICIES,
+        doc = "Default per-topic backlog quota limit. Increase it if you want to allow larger msg backlog"
+    )
     private long backlogQuotaDefaultLimitGB = 50;
-    //Default backlog quota retention policy. Default is producer_request_hold
-    //'producer_request_hold' Policy which holds producer's send request until the resource becomes available (or holding times out)
-    //'producer_exception' Policy which throws javax.jms.ResourceAllocationException to the producer
-    //'consumer_backlog_eviction' Policy which evicts the oldest message from the slowest consumer's backlog
+    @FieldContext(
+        category = CATEGORY_POLICIES,
+        doc = "Default backlog quota retention policy. Default is producer_request_hold\n\n"
+            + "'producer_request_hold' Policy which holds producer's send request until the"
+            + "resource becomes available (or holding times out)\n"
+            + "'producer_exception' Policy which throws javax.jms.ResourceAllocationException to the producer\n"
+            + "'consumer_backlog_eviction' Policy which evicts the oldest message from the slowest consumer's backlog"
+    )
     private BacklogQuota.RetentionPolicy backlogQuotaDefaultRetentionPolicy = BacklogQuota.RetentionPolicy.producer_request_hold;
-    // Enable the deletion of inactive topics
+
+    @FieldContext(
+        category = CATEGORY_POLICIES,
+        doc = "Enable the deletion of inactive topics"
+    )
     private boolean brokerDeleteInactiveTopicsEnabled = true;
-    // How often to check for inactive topics
+    @FieldContext(
+        category = CATEGORY_POLICIES,
+        doc = "How often to check for inactive topics"
+    )
     private long brokerDeleteInactiveTopicsFrequencySeconds = 60;
-    // How frequently to proactively check and purge expired messages
+    @FieldContext(
+        category = CATEGORY_POLICIES,
+        doc = "How frequently to proactively check and purge expired messages"
+    )
     private int messageExpiryCheckIntervalInMinutes = 5;
-    // How long to delay rewinding cursor and dispatching messages when active consumer is changed
+    @FieldContext(
+        category = CATEGORY_POLICIES,
+        doc = "How long to delay rewinding cursor and dispatching messages when active consumer is changed"
+    )
     private int activeConsumerFailoverDelayTimeMillis = 1000;
-    // How long to delete inactive subscriptions from last consuming
-    // When it is 0, inactive subscriptions are not deleted automatically
+    @FieldContext(
+        category = CATEGORY_POLICIES,
+        doc = "How long to delete inactive subscriptions from last consuming."
+            + " When it is 0, inactive subscriptions are not deleted automatically"
+    )
     private long subscriptionExpirationTimeMinutes = 0;
-    // How frequently to proactively check and purge expired subscription
+    @FieldContext(
+        category = CATEGORY_POLICIES,
+        doc = "How frequently to proactively check and purge expired subscription"
+    )
     private long subscriptionExpiryCheckIntervalInMinutes = 5;
 
-    // Set the default behavior for message deduplication in the broker
-    // This can be overridden per-namespace. If enabled, broker will reject
-    // messages that were already stored in the topic
+    @FieldContext(
+        category = CATEGORY_POLICIES,
+        doc = "Set the default behavior for message deduplication in the broker.\n\n"
+            + "This can be overridden per-namespace. If enabled, broker will reject"
+            + " messages that were already stored in the topic"
+    )
     private boolean brokerDeduplicationEnabled = false;
 
-    // Maximum number of producer information that it's going to be
-    // persisted for deduplication purposes
+    @FieldContext(
+        category = CATEGORY_POLICIES,
+        doc = "Maximum number of producer information that it's going to be persisted for deduplication purposes"
+    )
     private int brokerDeduplicationMaxNumberOfProducers = 10000;
 
-    // Number of entries after which a dedup info snapshot is taken.
-    // A bigger interval will lead to less snapshots being taken though it would
-    // increase the topic recovery time, when the entries published after the
-    // snapshot need to be replayed
+    @FieldContext(
+        category = CATEGORY_POLICIES,
+        doc = "Number of entries after which a dedup info snapshot is taken.\n\n"
+            + "A bigger interval will lead to less snapshots being taken though it would"
+            + " increase the topic recovery time, when the entries published after the"
+            + " snapshot need to be replayed"
+    )
     private int brokerDeduplicationEntriesInterval = 1000;
 
-    // Time of inactivity after which the broker will discard the deduplication information
-    // relative to a disconnected producer. Default is 6 hours.
+    @FieldContext(
+        category = CATEGORY_POLICIES,
+        doc = "Time of inactivity after which the broker will discard the deduplication information"
+            + " relative to a disconnected producer. Default is 6 hours.")
     private int brokerDeduplicationProducerInactivityTimeoutMinutes = 360;
 
-    // When a namespace is created without specifying the number of bundle, this
-    // value will be used as the default
+    @FieldContext(
+        category = CATEGORY_POLICIES,
+        doc = "When a namespace is created without specifying the number of bundle, this"
+            + " value will be used as the default")
     private int defaultNumberOfNamespaceBundles = 4;
 
-    // Enable check for minimum allowed client library version
-    @FieldContext(dynamic = true)
+    @FieldContext(
+        category = CATEGORY_SERVER,
+        dynamic = true,
+        doc = "Enable check for minimum allowed client library version"
+    )
     private boolean clientLibraryVersionCheckEnabled = false;
-    // Path for the file used to determine the rotation status for the broker
-    // when responding to service discovery health checks
+    @FieldContext(
+        category = CATEGORY_SERVER,
+        doc = "Path for the file used to determine the rotation status for the broker"
+            + " when responding to service discovery health checks")
     private String statusFilePath;
-    // Max number of unacknowledged messages allowed to receive messages by a consumer on a shared subscription. Broker
-    // will stop sending messages to consumer once, this limit reaches until consumer starts acknowledging messages back
-    // and unack count reaches to maxUnackedMessagesPerConsumer/2 Using a value of 0, is disabling unackedMessage-limit
-    // check and consumer can receive messages without any restriction
+
+    @FieldContext(
+        category = CATEGORY_POLICIES,
+        doc = "Max number of unacknowledged messages allowed to receive messages by a consumer on"
+            + " a shared subscription.\n\n Broker will stop sending messages to consumer once,"
+            + " this limit reaches until consumer starts acknowledging messages back and unack count"
+            + " reaches to `maxUnackedMessagesPerConsumer/2`. Using a value of 0, it is disabling "
+            + " unackedMessage-limit check and consumer can receive messages without any restriction")
     private int maxUnackedMessagesPerConsumer = 50000;
-    // Max number of unacknowledged messages allowed per shared subscription. Broker will stop dispatching messages to
-    // all consumers of the subscription once this limit reaches until consumer starts acknowledging messages back and
-    // unack count reaches to limit/2. Using a value of 0, is disabling unackedMessage-limit
-    // check and dispatcher can dispatch messages without any restriction
+    @FieldContext(
+        category = CATEGORY_POLICIES,
+        doc = "Max number of unacknowledged messages allowed per shared subscription. \n\n"
+            + " Broker will stop dispatching messages to all consumers of the subscription once this "
+            + " limit reaches until consumer starts acknowledging messages back and unack count reaches"
+            + " to `limit/2`. Using a value of 0, is disabling unackedMessage-limit check and dispatcher"
+            + " can dispatch messages without any restriction")
     private int maxUnackedMessagesPerSubscription = 4 * 50000;
-    // Max number of unacknowledged messages allowed per broker. Once this limit reaches, broker will stop dispatching
-    // messages to all shared subscription which has higher number of unack messages until subscriptions start
-    // acknowledging messages back and unack count reaches to limit/2. Using a value of 0, is disabling
-    // unackedMessage-limit check and broker doesn't block dispatchers
+    @FieldContext(
+        category = CATEGORY_POLICIES,
+        doc = "Max number of unacknowledged messages allowed per broker. \n\n"
+            + " Once this limit reaches, broker will stop dispatching messages to all shared subscription "
+            + " which has higher number of unack messages until subscriptions start acknowledging messages "
+            + " back and unack count reaches to `limit/2`. Using a value of 0, is disabling unackedMessage-limit"
+            + " check and broker doesn't block dispatchers")
     private int maxUnackedMessagesPerBroker = 0;
-    // Once broker reaches maxUnackedMessagesPerBroker limit, it blocks subscriptions which has higher unacked messages
-    // than this percentage limit and subscription will not receive any new messages until that subscription acks back
-    // limit/2 messages
+    @FieldContext(
+        category = CATEGORY_POLICIES,
+        doc = "Once broker reaches maxUnackedMessagesPerBroker limit, it blocks subscriptions which has higher "
+            + " unacked messages than this percentage limit and subscription will not receive any new messages "
+            + " until that subscription acks back `limit/2` messages")
     private double maxUnackedMessagesPerSubscriptionOnBrokerBlocked = 0.16;
-    // Too many subscribe requests from a consumer can cause broker rewinding consumer cursors and loading data from bookies,
-    // hence causing high network bandwidth usage
-    // When the positive value is set, broker will throttle the subscribe requests for one consumer.
-    // Otherwise, the throttling will be disabled. The default value of this setting is 0 - throttling is disabled.
-    @FieldContext(dynamic = true)
+    @FieldContext(
+        category = CATEGORY_POLICIES,
+        dynamic = true,
+        doc = "Too many subscribe requests from a consumer can cause broker rewinding consumer cursors "
+            + " and loading data from bookies, hence causing high network bandwidth usage When the positive"
+            + " value is set, broker will throttle the subscribe requests for one consumer. Otherwise, the"
+            + " throttling will be disabled. The default value of this setting is 0 - throttling is disabled.")
     private int subscribeThrottlingRatePerConsumer = 0;
-    // Rate period for {subscribeThrottlingRatePerConsumer}. Default is 30s.
-    @FieldContext(minValue = 1, dynamic = true)
+    @FieldContext(
+        minValue = 1,
+        dynamic = true,
+        category = CATEGORY_POLICIES,
+        doc = "Rate period for {subscribeThrottlingRatePerConsumer}. Default is 30s."
+    )
     private int subscribeRatePeriodPerConsumerInSecond = 30;
-    // Default number of message dispatching throttling-limit for every topic. Using a value of 0, is disabling default
-    // message dispatch-throttling
-    @FieldContext(dynamic = true)
+
+    @FieldContext(
+        dynamic = true,
+        category = CATEGORY_POLICIES,
+        doc = "Default number of message dispatching throttling-limit for every topic. \n\n"
+            + "Using a value of 0, is disabling default message dispatch-throttling")
     private int dispatchThrottlingRatePerTopicInMsg = 0;
-    // Default number of message-bytes dispatching throttling-limit for every topic. Using a value of 0, is disabling
-    // default message-byte dispatch-throttling
-    @FieldContext(dynamic = true)
+    @FieldContext(
+        dynamic = true,
+        category = CATEGORY_POLICIES,
+        doc = "Default number of message-bytes dispatching throttling-limit for every topic. \n\n"
+            + "Using a value of 0, is disabling default message-byte dispatch-throttling")
     private long dispatchThrottlingRatePerTopicInByte = 0;
-    // Default number of message dispatching throttling-limit for a subscription.
-    // Using a value of 0, is disabling default message dispatch-throttling.
-    @FieldContext(dynamic = true)
+    @FieldContext(
+        dynamic = true,
+        category = CATEGORY_POLICIES,
+        doc = "Default number of message dispatching throttling-limit for a subscription. \n\n"
+            + "Using a value of 0, is disabling default message dispatch-throttling.")
     private int dispatchThrottlingRatePerSubscriptionInMsg = 0;
-    // Default number of message-bytes dispatching throttling-limit for a subscription.
-    // Using a value of 0, is disabling default message-byte dispatch-throttling.
-    @FieldContext(dynamic = true)
+    @FieldContext(
+        dynamic = true,
+        category = CATEGORY_POLICIES,
+        doc = "Default number of message-bytes dispatching throttling-limit for a subscription. \n\n"
+            + "Using a value of 0, is disabling default message-byte dispatch-throttling.")
     private long dispatchThrottlingRatePerSubscribeInByte = 0;
-    // Default dispatch-throttling is disabled for consumers which already caught-up with published messages and
-    // don't have backlog. This enables dispatch-throttling for non-backlog consumers as well.
-    @FieldContext(dynamic = true)
+
+    @FieldContext(
+        dynamic = true,
+        category = CATEGORY_POLICIES,
+        doc = "Default dispatch-throttling is disabled for consumers which already caught-up with"
+            + " published messages and don't have backlog. This enables dispatch-throttling for "
+            + " non-backlog consumers as well.")
     private boolean dispatchThrottlingOnNonBacklogConsumerEnabled = false;
-    // Max number of concurrent lookup request broker allows to throttle heavy incoming lookup traffic
-    @FieldContext(dynamic = true)
+    @FieldContext(
+        dynamic = true,
+        category = CATEGORY_SERVER,
+        doc = "Max number of concurrent lookup request broker allows to throttle heavy incoming lookup traffic")
     private int maxConcurrentLookupRequest = 50000;
-    // Max number of concurrent topic loading request broker allows to control number of zk-operations
-    @FieldContext(dynamic = true)
+
+    @FieldContext(
+        dynamic = true,
+        category = CATEGORY_SERVER,
+        doc = "Max number of concurrent topic loading request broker allows to control number of zk-operations"
+    )
     private int maxConcurrentTopicLoadRequest = 5000;
-    // Max concurrent non-persistent message can be processed per connection
+    @FieldContext(
+        category = CATEGORY_SERVER,
+        doc = "Max concurrent non-persistent message can be processed per connection")
     private int maxConcurrentNonPersistentMessagePerConnection = 1000;
-    // Number of worker threads to serve non-persistent topic
+    @FieldContext(
+        category = CATEGORY_SERVER,
+        doc = "Number of worker threads to serve non-persistent topic")
     private int numWorkerThreadsForNonPersistentTopic = Runtime.getRuntime().availableProcessors();;
 
-    // Enable broker to load persistent topics
+    @FieldContext(
+        category = CATEGORY_SERVER,
+        doc = "Enable broker to load persistent topics"
+    )
     private boolean enablePersistentTopics = true;
 
-    // Enable broker to load non-persistent topics
+    @FieldContext(
+        category = CATEGORY_SERVER,
+        doc = "Enable broker to load non-persistent topics"
+    )
     private boolean enableNonPersistentTopics = true;
 
-    // Enable to run bookie along with broker
+    @FieldContext(
+        category = CATEGORY_SERVER,
+        doc = "Enable to run bookie along with broker"
+    )
     private boolean enableRunBookieTogether = false;
 
-    // Enable to run bookie autorecovery along with broker
+    @FieldContext(
+        category = CATEGORY_SERVER,
+        doc = "Enable to run bookie autorecovery along with broker"
+    )
     private boolean enableRunBookieAutoRecoveryTogether = false;
 
-    // Max number of producers allowed to connect to topic. Once this limit reaches, Broker will reject new producers
-    // until the number of connected producers decrease.
-    // Using a value of 0, is disabling maxProducersPerTopic-limit check.
+    @FieldContext(
+        category = CATEGORY_SERVER,
+        doc = "Max number of producers allowed to connect to topic. \n\nOnce this limit reaches,"
+            + " Broker will reject new producers until the number of connected producers decrease."
+            + " Using a value of 0, is disabling maxProducersPerTopic-limit check.")
     private int maxProducersPerTopic = 0;
 
-    // Max number of consumers allowed to connect to topic. Once this limit reaches, Broker will reject new consumers
-    // until the number of connected consumers decrease.
-    // Using a value of 0, is disabling maxConsumersPerTopic-limit check.
+    @FieldContext(
+        category = CATEGORY_SERVER,
+        doc = "Max number of consumers allowed to connect to topic. \n\nOnce this limit reaches,"
+            + " Broker will reject new consumers until the number of connected consumers decrease."
+            + " Using a value of 0, is disabling maxConsumersPerTopic-limit check.")
     private int maxConsumersPerTopic = 0;
 
-    // Max number of consumers allowed to connect to subscription. Once this limit reaches, Broker will reject new consumers
-    // until the number of connected consumers decrease.
-    // Using a value of 0, is disabling maxConsumersPerSubscription-limit check.
+    @FieldContext(
+        category = CATEGORY_SERVER,
+        doc = "Max number of consumers allowed to connect to subscription. \n\nOnce this limit reaches,"
+            + " Broker will reject new consumers until the number of connected consumers decrease."
+            + " Using a value of 0, is disabling maxConsumersPerSubscription-limit check.")
     private int maxConsumersPerSubscription = 0;
 
     /***** --- TLS --- ****/
+    @FieldContext(
+        category = CATEGORY_TLS,
+        doc = "Enable TLS"
+    )
     @Deprecated
     private boolean tlsEnabled = false;
-    // Path for the TLS certificate file
+    @FieldContext(
+        category = CATEGORY_TLS,
+        doc = "Path for the TLS certificate file"
+    )
     private String tlsCertificateFilePath;
-    // Path for the TLS private key file
+    @FieldContext(
+        category = CATEGORY_TLS,
+        doc = "Path for the TLS private key file"
+    )
     private String tlsKeyFilePath;
-    // Path for the trusted TLS certificate file
+    @FieldContext(
+        category = CATEGORY_TLS,
+        doc = "Path for the trusted TLS certificate file"
+    )
     private String tlsTrustCertsFilePath = "";
-    // Accept untrusted TLS certificate from client
+    @FieldContext(
+        category = CATEGORY_TLS,
+        doc = "Accept untrusted TLS certificate from client"
+    )
     private boolean tlsAllowInsecureConnection = false;
-    // Specify the tls protocols the broker will use to negotiate during TLS Handshake.
-    // Example:- [TLSv1.2, TLSv1.1, TLSv1]
+    @FieldContext(
+        category = CATEGORY_TLS,
+        doc = "Specify the tls protocols the broker will use to negotiate during TLS Handshake.\n\n"
+            + "Example:- [TLSv1.2, TLSv1.1, TLSv1]"
+    )
     private Set<String> tlsProtocols = Sets.newTreeSet();
-    // Specify the tls cipher the broker will use to negotiate during TLS Handshake.
-    // Example:- [TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256]
+    @FieldContext(
+        category = CATEGORY_TLS,
+        doc = "Specify the tls cipher the broker will use to negotiate during TLS Handshake.\n\n"
+            + "Example:- [TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256]"
+    )
     private Set<String> tlsCiphers = Sets.newTreeSet();
-    // Specify whether Client certificates are required for TLS
-    // Reject the Connection if the Client Certificate is not trusted.
+    @FieldContext(
+        category = CATEGORY_TLS,
+        doc = "Specify whether Client certificates are required for TLS Reject.\n"
+            + "the Connection if the Client Certificate is not trusted")
     private boolean tlsRequireTrustedClientCertOnConnect = false;
 
     /***** --- Authentication --- ****/
-    // Enable authentication
+    @FieldContext(
+        category = CATEGORY_AUTHENTICATION,
+        doc = "Enable authentication"
+    )
     private boolean authenticationEnabled = false;
-    // Autentication provider name list, which is a list of class names
+    @FieldContext(
+        category = CATEGORY_AUTHENTICATION,
+        doc = "Autentication provider name list, which is a list of class names"
+    )
     private Set<String> authenticationProviders = Sets.newTreeSet();
 
-    // Enforce authorization
+    @FieldContext(
+        category = CATEGORY_AUTHORIZATION,
+        doc = "Enforce authorization"
+    )
     private boolean authorizationEnabled = false;
-    // Authorization provider fully qualified class-name
+    @FieldContext(
+        category = CATEGORY_AUTHORIZATION,
+        doc = "Authorization provider fully qualified class-name"
+    )
     private String authorizationProvider = PulsarAuthorizationProvider.class.getName();
 
-    // Role names that are treated as "super-user", meaning they will be able to
-    // do all admin operations and publish/consume from all topics
+    @FieldContext(
+        category = CATEGORY_AUTHORIZATION,
+        doc = "Role names that are treated as `super-user`, meaning they will be able to"
+            + " do all admin operations and publish/consume from all topics"
+    )
     private Set<String> superUserRoles = Sets.newTreeSet();
 
-    // Role names that are treated as "proxy roles". If the broker sees a request with
-    // role as proxyRoles - it will demand to see the original client role or certificate.
+    @FieldContext(
+        category = CATEGORY_AUTHORIZATION,
+        doc = "Role names that are treated as `proxy roles`. \n\nIf the broker sees"
+            + " a request with role as proxyRoles - it will demand to see the original"
+            + " client role or certificate.")
     private Set<String> proxyRoles = Sets.newTreeSet();
 
-    // If this flag is set then the broker authenticates the original Auth data
-    // else it just accepts the originalPrincipal and authorizes it (if required).
+    @FieldContext(
+        category = CATEGORY_AUTHORIZATION,
+        doc = "If this flag is set then the broker authenticates the original Auth data"
+            + " else it just accepts the originalPrincipal and authorizes it (if required)")
     private boolean authenticateOriginalAuthData = false;
 
-    // Allow wildcard matching in authorization
-    // (wildcard matching only applicable if wildcard-char:
-    // * presents at first or last position eg: *.pulsar.service, pulsar.service.*)
+    @FieldContext(
+        category = CATEGORY_AUTHORIZATION,
+        doc = "Allow wildcard matching in authorization\n\n"
+            + "(wildcard matching only applicable if wildcard-char: * presents at first"
+            + " or last position eg: *.pulsar.service, pulsar.service.*)")
     private boolean authorizationAllowWildcardsMatching = false;
 
-    // Authentication settings of the broker itself. Used when the broker connects
-    // to other brokers, either in same or other clusters. Default uses plugin which disables authentication
+    @FieldContext(
+        category = CATEGORY_AUTHENTICATION,
+        doc = "Authentication settings of the broker itself. \n\nUsed when the broker connects"
+            + " to other brokers, either in same or other clusters. Default uses plugin which disables authentication"
+    )
     private String brokerClientAuthenticationPlugin = "org.apache.pulsar.client.impl.auth.AuthenticationDisabled";
+    @FieldContext(
+        category = CATEGORY_AUTHENTICATION,
+        doc = "Authentication parameters of the authentication plugin the broker is using to connect to other brokers"
+    )
     private String brokerClientAuthenticationParameters = "";
-    // Path for the trusted TLS certificate file for outgoing connection to a server (broker)
+    @FieldContext(
+        category = CATEGORY_AUTHENTICATION,
+        doc = "Path for the trusted TLS certificate file for outgoing connection to a server (broker)")
     private String brokerClientTrustCertsFilePath = "";
 
-    // When this parameter is not empty, unauthenticated users perform as anonymousUserRole
+    @FieldContext(
+        category = CATEGORY_AUTHORIZATION,
+        doc = "When this parameter is not empty, unauthenticated users perform as anonymousUserRole"
+    )
     private String anonymousUserRole = null;
 
     /**** --- BookKeeper Client --- ****/
-    // Authentication plugin to use when connecting to bookies
+    @FieldContext(
+        category = CATEGORY_STORAGE_BK,
+        doc = "Authentication plugin to use when connecting to bookies"
+    )
     private String bookkeeperClientAuthenticationPlugin;
-    // BookKeeper auth plugin implementatation specifics parameters name and values
+    @FieldContext(
+        category = CATEGORY_STORAGE_BK,
+        doc = "BookKeeper auth plugin implementatation specifics parameters name and values"
+    )
     private String bookkeeperClientAuthenticationParametersName;
+    @FieldContext(
+        category = CATEGORY_STORAGE_BK,
+        doc = "Parameters for bookkeeper auth plugin"
+    )
     private String bookkeeperClientAuthenticationParameters;
 
-    // Timeout for BK add / read operations
+    @FieldContext(
+        category = CATEGORY_STORAGE_BK,
+        doc = "Timeout for BK add / read operations"
+    )
     private long bookkeeperClientTimeoutInSeconds = 30;
-    // Speculative reads are initiated if a read request doesn't complete within
-    // a certain time Using a value of 0, is disabling the speculative reads
+    @FieldContext(
+        category = CATEGORY_STORAGE_BK,
+        doc = "Speculative reads are initiated if a read request doesn't complete within"
+            + " a certain time Using a value of 0, is disabling the speculative reads")
     private int bookkeeperClientSpeculativeReadTimeoutInMillis = 0;
-    // Use older Bookkeeper wire protocol with bookie
+    @FieldContext(
+        category = CATEGORY_STORAGE_BK,
+        doc = "Use older Bookkeeper wire protocol with bookie"
+    )
     private boolean bookkeeperUseV2WireProtocol = true;
-    // Enable bookies health check. Bookies that have more than the configured
-    // number of failure within the interval will be quarantined for some time.
-    // During this period, new ledgers won't be created on these bookies
+    @FieldContext(
+        category = CATEGORY_STORAGE_BK,
+        doc = "Enable bookies health check. \n\n Bookies that have more than the configured"
+            + " number of failure within the interval will be quarantined for some time."
+            + " During this period, new ledgers won't be created on these bookies")
     private boolean bookkeeperClientHealthCheckEnabled = true;
+    @FieldContext(
+        category = CATEGORY_STORAGE_BK,
+        doc = "Bookies health check interval in seconds"
+    )
     private long bookkeeperClientHealthCheckIntervalSeconds = 60;
+    @FieldContext(
+        category = CATEGORY_STORAGE_BK,
+        doc = "Bookies health check error threshold per check interval"
+    )
     private long bookkeeperClientHealthCheckErrorThresholdPerInterval = 5;
+    @FieldContext(
+        category = CATEGORY_STORAGE_BK,
+        doc = "Bookie health check quarantined time in seconds"
+    )
     private long bookkeeperClientHealthCheckQuarantineTimeInSeconds = 1800;
-    // Enable rack-aware bookie selection policy. BK will chose bookies from
-    // different racks when forming a new bookie ensemble
+    @FieldContext(
+        category = CATEGORY_STORAGE_BK,
+        doc = "Enable rack-aware bookie selection policy. \n\nBK will chose bookies from"
+            + " different racks when forming a new bookie ensemble")
     private boolean bookkeeperClientRackawarePolicyEnabled = true;
-    // Enable region-aware bookie selection policy. BK will chose bookies from
-    // different regions and racks when forming a new bookie ensemble
+    @FieldContext(
+        category = CATEGORY_STORAGE_BK,
+        doc = "Enable region-aware bookie selection policy. \n\nBK will chose bookies from"
+            + " different regions and racks when forming a new bookie ensemble")
     private boolean bookkeeperClientRegionawarePolicyEnabled = false;
-    // Enable/disable reordering read sequence on reading entries.
+    @FieldContext(
+        category = CATEGORY_STORAGE_BK,
+        doc = "Enable/disable reordering read sequence on reading entries")
     private boolean bookkeeperClientReorderReadSequenceEnabled = false;
-    // Enable bookie isolation by specifying a list of bookie groups to choose
-    // from. Any bookie outside the specified groups will not be used by the
-    // broker
-    @FieldContext(required = false)
+    @FieldContext(
+        category = CATEGORY_STORAGE_BK,
+        required = false,
+        doc = "Enable bookie isolation by specifying a list of bookie groups to choose from. \n\n"
+            + "Any bookie outside the specified groups will not be used by the broker")
     private String bookkeeperClientIsolationGroups;
 
     /**** --- Managed Ledger --- ****/
-    // Number of bookies to use when creating a ledger
-    @FieldContext(minValue = 1)
+    @FieldContext(
+        minValue = 1,
+        category = CATEGORY_STORAGE_ML,
+        doc = "Number of bookies to use when creating a ledger"
+    )
     private int managedLedgerDefaultEnsembleSize = 1;
-    // Number of copies to store for each message
-    @FieldContext(minValue = 1)
+    @FieldContext(
+        minValue = 1,
+        category = CATEGORY_STORAGE_ML,
+        doc = "Number of copies to store for each message"
+    )
     private int managedLedgerDefaultWriteQuorum = 1;
-    // Number of guaranteed copies (acks to wait before write is complete)
-    @FieldContext(minValue = 1)
+    @FieldContext(
+        minValue = 1,
+        category = CATEGORY_STORAGE_ML,
+        doc = "Number of guaranteed copies (acks to wait before write is complete)"
+    )
     private int managedLedgerDefaultAckQuorum = 1;
 
-    // Default type of checksum to use when writing to BookKeeper. Default is "CRC32C"
-    // Other possible options are "CRC32", "MAC" or "DUMMY" (no checksum).
+    //
+    //
+    @FieldContext(
+        category = CATEGORY_STORAGE_ML,
+        doc = "Default type of checksum to use when writing to BookKeeper. \n\nDefault is `CRC32C`."
+            + " Other possible options are `CRC32`, `MAC` or `DUMMY` (no checksum)."
+    )
     private DigestType managedLedgerDigestType = DigestType.CRC32C;
 
-    // Max number of bookies to use when creating a ledger
-    @FieldContext(minValue = 1)
+    @FieldContext(
+        minValue = 1,
+        category = CATEGORY_STORAGE_ML,
+        doc = "Max number of bookies to use when creating a ledger"
+    )
     private int managedLedgerMaxEnsembleSize = 5;
-    // Max number of copies to store for each message
-    @FieldContext(minValue = 1)
+    @FieldContext(
+        minValue = 1,
+        category = CATEGORY_STORAGE_ML,
+        doc = "Max number of copies to store for each message"
+    )
     private int managedLedgerMaxWriteQuorum = 5;
-    // Max number of guaranteed copies (acks to wait before write is complete)
-    @FieldContext(minValue = 1)
+    @FieldContext(
+        minValue = 1,
+        category = CATEGORY_STORAGE_ML,
+        doc = "Max number of guaranteed copies (acks to wait before write is complete)"
+    )
     private int managedLedgerMaxAckQuorum = 5;
-    // Amount of memory to use for caching data payload in managed ledger. This
-    // memory
-    // is allocated from JVM direct memory and it's shared across all the topics
-    // running in the same broker
+    @FieldContext(
+        category = CATEGORY_STORAGE_ML,
+        doc = "Amount of memory to use for caching data payload in managed ledger. \n\nThis"
+            + " memory is allocated from JVM direct memory and it's shared across all the topics"
+            + " running in the same broker")
     private int managedLedgerCacheSizeMB = 1024;
-    // Threshold to which bring down the cache level when eviction is triggered
+    @FieldContext(
+        category = CATEGORY_STORAGE_ML,
+        doc = "Threshold to which bring down the cache level when eviction is triggered"
+    )
     private double managedLedgerCacheEvictionWatermark = 0.9f;
-    // Rate limit the amount of writes per second generated by consumer acking the messages
+    @FieldContext(
+        category = CATEGORY_STORAGE_ML,
+        doc = "Rate limit the amount of writes per second generated by consumer acking the messages"
+    )
     private double managedLedgerDefaultMarkDeleteRateLimit = 1.0;
 
-    // Number of threads to be used for managed ledger tasks dispatching
+    @FieldContext(
+        category = CATEGORY_STORAGE_ML,
+        doc = "Number of threads to be used for managed ledger tasks dispatching"
+    )
     private int managedLedgerNumWorkerThreads = Runtime.getRuntime().availableProcessors();
-    // Number of threads to be used for managed ledger scheduled tasks
+    @FieldContext(
+        category = CATEGORY_STORAGE_ML,
+        doc = "Number of threads to be used for managed ledger scheduled tasks"
+    )
     private int managedLedgerNumSchedulerThreads = Runtime.getRuntime().availableProcessors();
 
-    // Max number of entries to append to a ledger before triggering a rollover
-    // A ledger rollover is triggered on these conditions Either the max
-    // rollover time has been reached or max entries have been written to the
-    // ledged and at least min-time has passed
+    @FieldContext(
+        category = CATEGORY_STORAGE_ML,
+        doc = "Max number of entries to append to a ledger before triggering a rollover.\n\n"
+            + "A ledger rollover is triggered on these conditions Either the max"
+            + " rollover time has been reached or max entries have been written to the"
+            + " ledged and at least min-time has passed")
     private int managedLedgerMaxEntriesPerLedger = 50000;
-    // Minimum time between ledger rollover for a topic
+    @FieldContext(
+        category = CATEGORY_STORAGE_ML,
+        doc = "Minimum time between ledger rollover for a topic"
+    )
     private int managedLedgerMinLedgerRolloverTimeMinutes = 10;
-    // Maximum time before forcing a ledger rollover for a topic
+    @FieldContext(
+        category = CATEGORY_STORAGE_ML,
+        doc = "Maximum time before forcing a ledger rollover for a topic"
+    )
     private int managedLedgerMaxLedgerRolloverTimeMinutes = 240;
-    // Delay between a ledger being successfully offloaded to long term storage
-    // and the ledger being deleted from bookkeeper
+    @FieldContext(
+        category = CATEGORY_STORAGE_OFFLOADING,
+        doc = "Delay between a ledger being successfully offloaded to long term storage,"
+            + " and the ledger being deleted from bookkeeper"
+    )
     private long managedLedgerOffloadDeletionLagMs = TimeUnit.HOURS.toMillis(4);
-    // Max number of entries to append to a cursor ledger
+    @FieldContext(
+        category = CATEGORY_STORAGE_ML,
+        doc = "Max number of entries to append to a cursor ledger"
+    )
     private int managedLedgerCursorMaxEntriesPerLedger = 50000;
-    // Max time before triggering a rollover on a cursor ledger
+    @FieldContext(
+        category = CATEGORY_STORAGE_ML,
+        doc = "Max time before triggering a rollover on a cursor ledger"
+    )
     private int managedLedgerCursorRolloverTimeInSeconds = 14400;
-    // Max number of "acknowledgment holes" that are going to be persistently stored.
-    // When acknowledging out of order, a consumer will leave holes that are supposed
-    // to be quickly filled by acking all the messages. The information of which
-    // messages are acknowledged is persisted by compressing in "ranges" of messages
-    // that were acknowledged. After the max number of ranges is reached, the information
-    // will only be tracked in memory and messages will be redelivered in case of
-    // crashes.
+    @FieldContext(
+        category = CATEGORY_STORAGE_ML,
+        doc = "Max number of `acknowledgment holes` that are going to be persistently stored.\n\n"
+            + "When acknowledging out of order, a consumer will leave holes that are supposed"
+            + " to be quickly filled by acking all the messages. The information of which"
+            + " messages are acknowledged is persisted by compressing in `ranges` of messages"
+            + " that were acknowledged. After the max number of ranges is reached, the information"
+            + " will only be tracked in memory and messages will be redelivered in case of"
+            + " crashes.")
     private int managedLedgerMaxUnackedRangesToPersist = 10000;
-    // Max number of "acknowledgment holes" that can be stored in Zookeeper. If number of unack message range is higher
-    // than this limit then broker will persist unacked ranges into bookkeeper to avoid additional data overhead into
-    // zookeeper.
+    @FieldContext(
+        category = CATEGORY_STORAGE_ML,
+        doc = "Max number of `acknowledgment holes` that can be stored in Zookeeper.\n\n"
+            + "If number of unack message range is higher than this limit then broker will persist"
+            + " unacked ranges into bookkeeper to avoid additional data overhead into zookeeper.")
     private int managedLedgerMaxUnackedRangesToPersistInZooKeeper = 1000;
-    // Skip reading non-recoverable/unreadable data-ledger under managed-ledger's list. It helps when data-ledgers gets
-    // corrupted at bookkeeper and managed-cursor is stuck at that ledger.
-    @FieldContext(dynamic = true)
+    @FieldContext(
+        dynamic = true,
+        category = CATEGORY_STORAGE_ML,
+        doc = "Skip reading non-recoverable/unreadable data-ledger under managed-ledger's list.\n\n"
+            + " It helps when data-ledgers gets corrupted at bookkeeper and managed-cursor is stuck at that ledger."
+    )
     private boolean autoSkipNonRecoverableData = false;
-    // operation timeout while updating managed-ledger metadata.
+    @FieldContext(
+        category = CATEGORY_STORAGE_ML,
+        doc = "operation timeout while updating managed-ledger metadata."
+    )
     private long managedLedgerMetadataOperationsTimeoutSeconds = 60;
 
     /*** --- Load balancer --- ****/
-    // Enable load balancer
+    @FieldContext(
+        category = CATEGORY_LOAD_BALANCER,
+        doc = "Enable load balancer"
+    )
     private boolean loadBalancerEnabled = true;
-    // load placement strategy[weightedRandomSelection/leastLoadedServer] (only used by SimpleLoadManagerImpl)
     @Deprecated
+    @FieldContext(
+        category = CATEGORY_LOAD_BALANCER,
+        deprecated = true,
+        doc = "load placement strategy[weightedRandomSelection/leastLoadedServer] (only used by SimpleLoadManagerImpl)"
+    )
     private String loadBalancerPlacementStrategy = "leastLoadedServer"; // weighted random selection
-    // Percentage of change to trigger load report update
-    @FieldContext(dynamic = true)
+    @FieldContext(
+        dynamic = true,
+        category = CATEGORY_LOAD_BALANCER,
+        doc = "Percentage of change to trigger load report update"
+    )
     private int loadBalancerReportUpdateThresholdPercentage = 10;
-    // maximum interval to update load report
-    @FieldContext(dynamic = true)
+    @FieldContext(
+        dynamic = true,
+        category = CATEGORY_LOAD_BALANCER,
+        doc = "maximum interval to update load report"
+    )
     private int loadBalancerReportUpdateMaxIntervalMinutes = 15;
-    // Frequency of report to collect
+    @FieldContext(
+        category = CATEGORY_LOAD_BALANCER,
+        doc = "Frequency of report to collect, in minutes"
+    )
     private int loadBalancerHostUsageCheckIntervalMinutes = 1;
-    // Enable/disable automatic bundle unloading for load-shedding
-    @FieldContext(dynamic = true)
+    @FieldContext(
+        dynamic = true,
+        category = CATEGORY_LOAD_BALANCER,
+        doc = "Enable/disable automatic bundle unloading for load-shedding"
+    )
     private boolean loadBalancerSheddingEnabled = true;
-    // Load shedding interval. Broker periodically checks whether some traffic should be offload from some over-loaded
-    // broker to other under-loaded brokers
+    @FieldContext(
+        category = CATEGORY_LOAD_BALANCER,
+        doc = "Load shedding interval. \n\nBroker periodically checks whether some traffic"
+            + " should be offload from some over-loaded broker to other under-loaded brokers"
+    )
     private int loadBalancerSheddingIntervalMinutes = 1;
-    // Prevent the same topics to be shed and moved to other broker more that
-    // once within this timeframe
+    @FieldContext(
+        category = CATEGORY_LOAD_BALANCER,
+        doc = "Prevent the same topics to be shed and moved to other broker more that"
+            + " once within this timeframe"
+    )
     private long loadBalancerSheddingGracePeriodMinutes = 30;
-    // Usage threshold to determine a broker as under-loaded (only used by SimpleLoadManagerImpl)
+    @FieldContext(
+        category = CATEGORY_LOAD_BALANCER,
+        deprecated = true,
+        doc = "Usage threshold to determine a broker as under-loaded (only used by SimpleLoadManagerImpl)"
+    )
     @Deprecated
     private int loadBalancerBrokerUnderloadedThresholdPercentage = 50;
-    // Usage threshold to allocate max number of topics to broker
-    @FieldContext(dynamic = true)
+    @FieldContext(
+        dynamic = true,
+        category = CATEGORY_LOAD_BALANCER,
+        doc = "Usage threshold to allocate max number of topics to broker"
+    )
     private int loadBalancerBrokerMaxTopics = 50000;
 
-    // Usage threshold to determine a broker as over-loaded
-    @FieldContext(dynamic = true)
+    @FieldContext(
+        dynamic = true,
+        category = CATEGORY_LOAD_BALANCER,
+        doc = "Usage threshold to determine a broker as over-loaded"
+    )
     private int loadBalancerBrokerOverloadedThresholdPercentage = 85;
-    // Interval to flush dynamic resource quota to ZooKeeper
+    @FieldContext(
+        category = CATEGORY_LOAD_BALANCER,
+        doc = "Interval to flush dynamic resource quota to ZooKeeper"
+    )
     private int loadBalancerResourceQuotaUpdateIntervalMinutes = 15;
-    // Usage threshold to determine a broker is having just right level of load (only used by SimpleLoadManagerImpl)
     @Deprecated
+    @FieldContext(
+        category = CATEGORY_LOAD_BALANCER,
+        deprecated = true,
+        doc = "Usage threshold to determine a broker is having just right level of load"
+            + " (only used by SimpleLoadManagerImpl)"
+    )
     private int loadBalancerBrokerComfortLoadLevelPercentage = 65;
-    // enable/disable automatic namespace bundle split
-    @FieldContext(dynamic = true)
+    @FieldContext(
+        dynamic = true,
+        category = CATEGORY_LOAD_BALANCER,
+        doc = "enable/disable automatic namespace bundle split"
+    )
     private boolean loadBalancerAutoBundleSplitEnabled = true;
-    // enable/disable automatic unloading of split bundles
-    @FieldContext(dynamic = true)
+    @FieldContext(
+        dynamic = true,
+        category = CATEGORY_LOAD_BALANCER,
+        doc = "enable/disable automatic unloading of split bundles"
+    )
     private boolean loadBalancerAutoUnloadSplitBundlesEnabled = true;
-    // maximum topics in a bundle, otherwise bundle split will be triggered
+    @FieldContext(
+        category = CATEGORY_LOAD_BALANCER,
+        doc = "maximum topics in a bundle, otherwise bundle split will be triggered"
+    )
     private int loadBalancerNamespaceBundleMaxTopics = 1000;
-    // maximum sessions (producers + consumers) in a bundle, otherwise bundle split will be triggered
+    @FieldContext(
+        category = CATEGORY_LOAD_BALANCER,
+        doc = "maximum sessions (producers + consumers) in a bundle, otherwise bundle split will be triggered"
+    )
     private int loadBalancerNamespaceBundleMaxSessions = 1000;
-    // maximum msgRate (in + out) in a bundle, otherwise bundle split will be triggered
+    @FieldContext(
+        category = CATEGORY_LOAD_BALANCER,
+        doc = "maximum msgRate (in + out) in a bundle, otherwise bundle split will be triggered"
+    )
     private int loadBalancerNamespaceBundleMaxMsgRate = 30000;
-    // maximum bandwidth (in + out) in a bundle, otherwise bundle split will be triggered
+    @FieldContext(
+        category = CATEGORY_LOAD_BALANCER,
+        doc = "maximum bandwidth (in + out) in a bundle, otherwise bundle split will be triggered"
+    )
     private int loadBalancerNamespaceBundleMaxBandwidthMbytes = 100;
-    // maximum number of bundles in a namespace
+    @FieldContext(
+        category = CATEGORY_LOAD_BALANCER,
+        doc = "maximum number of bundles in a namespace"
+    )
     private int loadBalancerNamespaceMaximumBundles = 128;
-    // Name of load manager to use
-    @FieldContext(dynamic = true)
+    @FieldContext(
+        dynamic = true,
+        category = CATEGORY_LOAD_BALANCER,
+        doc = "Name of load manager to use"
+    )
     private String loadManagerClassName = "org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl";
 
-    // Option to override the auto-detected network interfaces max speed
+    @FieldContext(
+        category = CATEGORY_LOAD_BALANCER,
+        doc = "Option to override the auto-detected network interfaces max speed"
+    )
     private Double loadBalancerOverrideBrokerNicSpeedGbps;
 
     /**** --- Replication --- ****/
-    // Enable replication metrics
+    @FieldContext(
+        category = CATEGORY_REPLICATION,
+        doc = "Enable replication metrics"
+    )
     private boolean replicationMetricsEnabled = false;
-    // Max number of connections to open for each broker in a remote cluster
-    // More connections host-to-host lead to better throughput over high-latency
-    // links.
+    @FieldContext(
+        category = CATEGORY_REPLICATION,
+        doc = "Max number of connections to open for each broker in a remote cluster.\n\n"
+            + "More connections host-to-host lead to better throughput over high-latency links"
+    )
     private int replicationConnectionsPerBroker = 16;
-    @FieldContext(required = false)
-    // replicator prefix used for replicator producer name and cursor name
+    @FieldContext(
+        required = false,
+        category = CATEGORY_REPLICATION,
+        doc = "replicator prefix used for replicator producer name and cursor name"
+    )
     private String replicatorPrefix = "pulsar.repl";
-    // Replicator producer queue size;
+    @FieldContext(
+        category = CATEGORY_REPLICATION,
+        doc = "Replicator producer queue size"
+    )
     private int replicationProducerQueueSize = 1000;
-    // @deprecated - Use brokerClientTlsEnabled instead.
     @Deprecated
+    @FieldContext(
+        category = CATEGORY_REPLICATION,
+        deprecated = true,
+        doc = "@deprecated - Use brokerClientTlsEnabled instead."
+    )
     private boolean replicationTlsEnabled = false;
-    // Enable TLS when talking with other brokers in the same cluster (admin operation) or different clusters (replication)
+    @FieldContext(
+        category = CATEGORY_REPLICATION,
+        doc = "Enable TLS when talking with other brokers in the same cluster (admin operation)"
+            + " or different clusters (replication)"
+    )
     private boolean brokerClientTlsEnabled = false;
 
-    // Default message retention time
+    @FieldContext(
+        category = CATEGORY_POLICIES,
+        doc = "Default message retention time"
+    )
     private int defaultRetentionTimeInMinutes = 0;
-    // Default retention size
+    @FieldContext(
+        category = CATEGORY_POLICIES,
+        doc = "Default retention size"
+    )
     private int defaultRetentionSizeInMB = 0;
-    // How often to check pulsar connection is still alive
+    @FieldContext(
+        category = CATEGORY_SERVER,
+        doc = "How often to check pulsar connection is still alive"
+    )
     private int keepAliveIntervalSeconds = 30;
-    // How often broker checks for inactive topics to be deleted (topics with no subscriptions and no one connected)
+    @FieldContext(
+        category = CATEGORY_POLICIES,
+        doc = "How often broker checks for inactive topics to be deleted (topics with no subscriptions and no one connected)"
+    )
     private int brokerServicePurgeInactiveFrequencyInSeconds = 60;
+    @FieldContext(
+        category = CATEGORY_SERVER,
+        doc = "A comma-separated list of namespaces to bootstrap"
+    )
     private List<String> bootstrapNamespaces = new ArrayList<String>();
     private Properties properties = new Properties();
-    // If true, (and ModularLoadManagerImpl is being used), the load manager will attempt to
-    // use only brokers running the latest software version (to minimize impact to bundles)
-    @FieldContext(dynamic = true)
+    @FieldContext(
+        dynamic = true,
+        category = CATEGORY_SERVER,
+        doc = "If true, (and ModularLoadManagerImpl is being used), the load manager will attempt to "
+            + "use only brokers running the latest software version (to minimize impact to bundles)"
+    )
     private boolean preferLaterVersions = false;
 
-    // Interval between checks to see if topics with compaction policies need to be compacted
+    @FieldContext(
+        category = CATEGORY_SERVER,
+        doc = "Interval between checks to see if topics with compaction policies need to be compacted"
+    )
     private int brokerServiceCompactionMonitorIntervalInSeconds = 60;
 
+    @FieldContext(
+        category = CATEGORY_SCHEMA,
+        doc = "Enforce schema validation on following cases:\n\n"
+            + " - if a producer without a schema attempts to produce to a topic with schema, the producer will be\n"
+            + "   failed to connect. PLEASE be carefully on using this, since non-java clients don't support schema.\n"
+            + "   if you enable this setting, it will cause non-java clients failed to produce."
+    )
     private boolean isSchemaValidationEnforced = false;
+    @FieldContext(
+        category = CATEGORY_SCHEMA,
+        doc = "The schema storage implementation used by this broker"
+    )
     private String schemaRegistryStorageClassName = "org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorageFactory";
+    @FieldContext(
+        category = CATEGORY_SCHEMA,
+        doc = "The list compatibility checkers to be used in schema registry"
+    )
     private Set<String> schemaRegistryCompatibilityCheckers = Sets.newHashSet(
             "org.apache.pulsar.broker.service.schema.JsonSchemaCompatibilityCheck",
             "org.apache.pulsar.broker.service.schema.AvroSchemaCompatibilityCheck"
     );
 
     /**** --- WebSocket --- ****/
-    // Number of IO threads in Pulsar Client used in WebSocket proxy
+    @FieldContext(
+        category = CATEGORY_WEBSOCKET,
+        doc = "Number of IO threads in Pulsar Client used in WebSocket proxy"
+    )
     private int webSocketNumIoThreads = Runtime.getRuntime().availableProcessors();
-    // Number of connections per Broker in Pulsar Client used in WebSocket proxy
+    @FieldContext(
+        category = CATEGORY_WEBSOCKET,
+        doc = "Number of connections per Broker in Pulsar Client used in WebSocket proxy"
+    )
     private int webSocketConnectionsPerBroker = Runtime.getRuntime().availableProcessors();
-    // Time in milliseconds that idle WebSocket session times out
+    @FieldContext(
+        category = CATEGORY_WEBSOCKET,
+        doc = "Time in milliseconds that idle WebSocket session times out"
+    )
     private int webSocketSessionIdleTimeoutMillis = 300000;
 
     /**** --- Metrics --- ****/
-    // If true, export topic level metrics otherwise namespace level
+    @FieldContext(
+        category = CATEGORY_METRICS,
+        doc = "If true, export topic level metrics otherwise namespace level"
+    )
     private boolean exposeTopicLevelMetricsInPrometheus = true;
+    @FieldContext(
+        category = CATEGORY_METRICS,
+        doc = "If true, export consumer level metrics otherwise namespace level"
+    )
     private boolean exposeConsumerLevelMetricsInPrometheus = false;
 
     /**** --- Functions --- ****/
+    @FieldContext(
+        category = CATEGORY_FUNCTIONS,
+        doc = "Flag indicates enabling or disabling function worker on brokers"
+    )
     private boolean functionsWorkerEnabled = false;
 
     /**** --- Broker Web Stats --- ****/
-    // If true, export publisher stats when returning topics stats from the admin rest api
+    @FieldContext(
+        category = CATEGORY_METRICS,
+        doc = "If true, export publisher stats when returning topics stats from the admin rest api"
+    )
     private boolean exposePublisherStats = true;
+    @FieldContext(
+        category = CATEGORY_METRICS,
+        doc = "Stats update frequency in seconds"
+    )
     private int statsUpdateFrequencyInSecs = 60;
+    @FieldContext(
+        category = CATEGORY_METRICS,
+        doc = "Stats update initial delay in seconds"
+    )
     private int statsUpdateInitialDelayInSecs = 60;
 
     /**** --- Ledger Offloading --- ****/
@@ -526,13 +1039,22 @@ public class ServiceConfiguration implements PulsarConfiguration {
      * NOTES: all implementation related settings should be put in implementation package.
      *        only common settings like driver name, io threads can be added here.
      ****/
-    // The directory to locate offloaders
+    @FieldContext(
+        category = CATEGORY_STORAGE_OFFLOADING,
+        doc = "The directory to locate offloaders"
+    )
     private String offloadersDirectory = "./offloaders";
 
-    // Driver to use to offload old data to long term storage
+    @FieldContext(
+        category = CATEGORY_STORAGE_OFFLOADING,
+        doc = "Driver to use to offload old data to long term storage"
+    )
     private String managedLedgerOffloadDriver = null;
 
-    // Maximum number of thread pool threads for ledger offloading
+    @FieldContext(
+        category = CATEGORY_STORAGE_OFFLOADING,
+        doc = "Maximum number of thread pool threads for ledger offloading"
+    )
     private int managedLedgerOffloadMaxThreads = 2;
 
     /**
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/PulsarConfigurationLoader.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/PulsarConfigurationLoader.java
index df0a1ec..aad9a2f 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/PulsarConfigurationLoader.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/PulsarConfigurationLoader.java
@@ -25,6 +25,7 @@ import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
 import java.util.Arrays;
 import java.util.Map;
 import java.util.Properties;
@@ -180,8 +181,10 @@ public class PulsarConfigurationLoader {
                 try {
                     Field convertedConfField = ServiceConfiguration.class.getDeclaredField(confField.getName());
                     confField.setAccessible(true);
-                    convertedConfField.setAccessible(true);
-                    convertedConfField.set(convertedConf, confField.get(conf));
+                    if (!Modifier.isStatic(convertedConfField.getModifiers())) {
+                        convertedConfField.setAccessible(true);
+                        convertedConfField.set(convertedConf, confField.get(conf));
+                    }
                 } catch (NoSuchFieldException e) {
                     if (!ignoreNonExistMember) {
                         throw new IllegalArgumentException("Exception caused while converting configuration: " + e.getMessage());