You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2018/12/20 10:22:34 UTC
[pulsar] branch master updated: [conf] Add annotations for
documenting broker configuration settings (#3113)
This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 9c3f2a7 [conf] Add annotations for documenting broker configuration settings (#3113)
9c3f2a7 is described below
commit 9c3f2a77bb352c073d2dc36094b42dc3c9918c3b
Author: Sijie Guo <gu...@gmail.com>
AuthorDate: Thu Dec 20 18:22:29 2018 +0800
[conf] Add annotations for documenting broker configuration settings (#3113)
*Motivation*
This change is adding annotations to broker configuration for generating broker configuration file.
---
.../apache/pulsar/broker/ServiceConfiguration.java | 1036 +++++++++++++++-----
.../configuration/PulsarConfigurationLoader.java | 7 +-
2 files changed, 784 insertions(+), 259 deletions(-)
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 3939ceb..75ee5d2 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -30,6 +30,7 @@ import lombok.Getter;
import lombok.Setter;
import org.apache.bookkeeper.client.api.DigestType;
import org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider;
+import org.apache.pulsar.common.configuration.Category;
import org.apache.pulsar.common.configuration.FieldContext;
import org.apache.pulsar.common.configuration.PulsarConfiguration;
import org.apache.pulsar.common.policies.data.BacklogQuota;
@@ -41,484 +42,996 @@ import org.apache.pulsar.common.policies.data.BacklogQuota;
@Setter
public class ServiceConfiguration implements PulsarConfiguration {
+ @Category
+ private static final String CATEGORY_SERVER = "Server";
+ @Category
+ private static final String CATEGORY_STORAGE_BK = "Storage (BookKeeper)";
+ @Category
+ private static final String CATEGORY_STORAGE_ML = "Storage (Managed Ledger)";
+ @Category
+ private static final String CATEGORY_STORAGE_OFFLOADING = "Storage (Ledger Offloading)";
+ @Category
+ private static final String CATEGORY_POLICIES = "Policies";
+ @Category
+ private static final String CATEGORY_WEBSOCKET = "WebSocket";
+ @Category
+ private static final String CATEGORY_SCHEMA = "Schema";
+ @Category
+ private static final String CATEGORY_METRICS = "Metrics";
+ @Category
+ private static final String CATEGORY_REPLICATION = "Replication";
+ @Category
+ private static final String CATEGORY_LOAD_BALANCER = "Load Balancer";
+ @Category
+ private static final String CATEGORY_FUNCTIONS = "Functions";
+ @Category
+ private static final String CATEGORY_TLS = "TLS";
+ @Category
+ private static final String CATEGORY_AUTHENTICATION = "Authentication";
+ @Category
+ private static final String CATEGORY_AUTHORIZATION = "Authorization";
+ @Category
+ private static final String CATEGORY_TOKEN_AUTH = "Token Authentication Provider";
+ @Category
+ private static final String CATEGORY_HTTP = "HTTP";
+
/***** --- pulsar configuration --- ****/
- // Zookeeper quorum connection string
- @FieldContext(required = true)
+ @FieldContext(
+ category = CATEGORY_SERVER,
+ required = true,
+ doc = "The Zookeeper quorum connection string (as a comma-separated list)"
+ )
private String zookeeperServers;
- // Global Zookeeper quorum connection string
@Deprecated
- @FieldContext(required = false)
+ @FieldContext(
+ category = CATEGORY_SERVER,
+ required = false,
+ deprecated = true,
+ doc = "Global Zookeeper quorum connection string (as a comma-separated list)."
+ + " Deprecated in favor of using `configurationStoreServers`"
+ )
private String globalZookeeperServers;
- // Configuration Store connection string
- @FieldContext(required = false)
+ @FieldContext(
+ category = CATEGORY_SERVER,
+ required = false,
+ doc = "Configuration store connection string (as a comma-separated list)"
+ )
private String configurationStoreServers;
+ @FieldContext(
+ category = CATEGORY_SERVER,
+ doc = "The port for serving binary protobuf requests"
+ )
private Integer brokerServicePort = 6650;
+ @FieldContext(
+ category = CATEGORY_SERVER,
+ doc = "The port for serving tls secured binary protobuf requests"
+ )
private Integer brokerServicePortTls = null;
- // Port to use to server HTTP request
+ @FieldContext(
+ category = CATEGORY_SERVER,
+ doc = "The port for serving http requests"
+ )
private Integer webServicePort = 8080;
- // Port to use to server HTTPS request
+ @FieldContext(
+ category = CATEGORY_SERVER,
+ doc = "The port for serving https requests"
+ )
private Integer webServicePortTls = null;
- // Hostname or IP address the service binds on.
+ @FieldContext(
+ category = CATEGORY_SERVER,
+ doc = "Hostname or IP address the service binds on"
+ )
private String bindAddress = "0.0.0.0";
- // Controls which hostname is advertised to the discovery service via ZooKeeper.
+ @FieldContext(
+ category = CATEGORY_SERVER,
+ doc = "Hostname or IP address the service advertises to the outside world."
+ + " If not set, the value of `InetAddress.getLocalHost().getHostname()` is used."
+ )
private String advertisedAddress;
- // Number of threads to use for Netty IO
+ @FieldContext(
+ category = CATEGORY_SERVER,
+ doc = "Number of threads to use for Netty IO."
+ + " Default is set to `2 * Runtime.getRuntime().availableProcessors()`"
+ )
private int numIOThreads = 2 * Runtime.getRuntime().availableProcessors();
- // Enable the WebSocket API service
+ @FieldContext(
+ category = CATEGORY_WEBSOCKET,
+ doc = "Enable the WebSocket API service in broker"
+ )
private boolean webSocketServiceEnabled = false;
- // Flag to control features that are meant to be used when running in standalone mode
+ @FieldContext(
+ category = CATEGORY_WEBSOCKET,
+ doc = "Flag indicates whether to run broker in standalone mode"
+ )
private boolean isRunningStandalone = false;
- // Name of the cluster to which this broker belongs to
- @FieldContext(required = true)
+ @FieldContext(
+ category = CATEGORY_SERVER,
+ required = true,
+ doc = "Name of the cluster to which this broker belongs to"
+ )
private String clusterName;
- // Enable cluster's failure-domain which can distribute brokers into logical region
- @FieldContext(dynamic = true)
+ @FieldContext(
+ category = CATEGORY_SERVER,
+ dynamic = true,
+ doc = "Enable cluster's failure-domain which can distribute brokers into logical region"
+ )
private boolean failureDomainsEnabled = false;
- // Zookeeper session timeout in milliseconds
+ @FieldContext(
+ category = CATEGORY_SERVER,
+ doc = "ZooKeeper session timeout in milliseconds"
+ )
private long zooKeeperSessionTimeoutMillis = 30000;
- // Time to wait for broker graceful shutdown. After this time elapses, the
- // process will be killed
- @FieldContext(dynamic = true)
+ @FieldContext(
+ category = CATEGORY_SERVER,
+ dynamic = true,
+ doc = "Time to wait for broker graceful shutdown. After this time elapses, the process will be killed"
+ )
private long brokerShutdownTimeoutMs = 60000;
- // Enable backlog quota check. Enforces action on topic when the quota is
- // reached
+
+ @FieldContext(
+ category = CATEGORY_POLICIES,
+ doc = "Enable backlog quota check. Enforces actions on topic when the quota is reached"
+ )
private boolean backlogQuotaCheckEnabled = true;
- // How often to check for topics that have reached the quota
+ @FieldContext(
+ category = CATEGORY_POLICIES,
+ doc = "How often to check for topics that have reached the quota."
+ + " It only takes effects when `backlogQuotaCheckEnabled` is true"
+ )
private int backlogQuotaCheckIntervalInSeconds = 60;
- // Default per-topic backlog quota limit
+ @FieldContext(
+ category = CATEGORY_POLICIES,
+ doc = "Default per-topic backlog quota limit. Increase it if you want to allow larger msg backlog"
+ )
private long backlogQuotaDefaultLimitGB = 50;
- //Default backlog quota retention policy. Default is producer_request_hold
- //'producer_request_hold' Policy which holds producer's send request until the resource becomes available (or holding times out)
- //'producer_exception' Policy which throws javax.jms.ResourceAllocationException to the producer
- //'consumer_backlog_eviction' Policy which evicts the oldest message from the slowest consumer's backlog
+ @FieldContext(
+ category = CATEGORY_POLICIES,
+ doc = "Default backlog quota retention policy. Default is producer_request_hold\n\n"
+ + "'producer_request_hold' Policy which holds producer's send request until the"
+ + "resource becomes available (or holding times out)\n"
+ + "'producer_exception' Policy which throws javax.jms.ResourceAllocationException to the producer\n"
+ + "'consumer_backlog_eviction' Policy which evicts the oldest message from the slowest consumer's backlog"
+ )
private BacklogQuota.RetentionPolicy backlogQuotaDefaultRetentionPolicy = BacklogQuota.RetentionPolicy.producer_request_hold;
- // Enable the deletion of inactive topics
+
+ @FieldContext(
+ category = CATEGORY_POLICIES,
+ doc = "Enable the deletion of inactive topics"
+ )
private boolean brokerDeleteInactiveTopicsEnabled = true;
- // How often to check for inactive topics
+ @FieldContext(
+ category = CATEGORY_POLICIES,
+ doc = "How often to check for inactive topics"
+ )
private long brokerDeleteInactiveTopicsFrequencySeconds = 60;
- // How frequently to proactively check and purge expired messages
+ @FieldContext(
+ category = CATEGORY_POLICIES,
+ doc = "How frequently to proactively check and purge expired messages"
+ )
private int messageExpiryCheckIntervalInMinutes = 5;
- // How long to delay rewinding cursor and dispatching messages when active consumer is changed
+ @FieldContext(
+ category = CATEGORY_POLICIES,
+ doc = "How long to delay rewinding cursor and dispatching messages when active consumer is changed"
+ )
private int activeConsumerFailoverDelayTimeMillis = 1000;
- // How long to delete inactive subscriptions from last consuming
- // When it is 0, inactive subscriptions are not deleted automatically
+ @FieldContext(
+ category = CATEGORY_POLICIES,
+ doc = "How long to delete inactive subscriptions from last consuming."
+ + " When it is 0, inactive subscriptions are not deleted automatically"
+ )
private long subscriptionExpirationTimeMinutes = 0;
- // How frequently to proactively check and purge expired subscription
+ @FieldContext(
+ category = CATEGORY_POLICIES,
+ doc = "How frequently to proactively check and purge expired subscription"
+ )
private long subscriptionExpiryCheckIntervalInMinutes = 5;
- // Set the default behavior for message deduplication in the broker
- // This can be overridden per-namespace. If enabled, broker will reject
- // messages that were already stored in the topic
+ @FieldContext(
+ category = CATEGORY_POLICIES,
+ doc = "Set the default behavior for message deduplication in the broker.\n\n"
+ + "This can be overridden per-namespace. If enabled, broker will reject"
+ + " messages that were already stored in the topic"
+ )
private boolean brokerDeduplicationEnabled = false;
- // Maximum number of producer information that it's going to be
- // persisted for deduplication purposes
+ @FieldContext(
+ category = CATEGORY_POLICIES,
+ doc = "Maximum number of producer information that it's going to be persisted for deduplication purposes"
+ )
private int brokerDeduplicationMaxNumberOfProducers = 10000;
- // Number of entries after which a dedup info snapshot is taken.
- // A bigger interval will lead to less snapshots being taken though it would
- // increase the topic recovery time, when the entries published after the
- // snapshot need to be replayed
+ @FieldContext(
+ category = CATEGORY_POLICIES,
+ doc = "Number of entries after which a dedup info snapshot is taken.\n\n"
+ + "A bigger interval will lead to less snapshots being taken though it would"
+ + " increase the topic recovery time, when the entries published after the"
+ + " snapshot need to be replayed"
+ )
private int brokerDeduplicationEntriesInterval = 1000;
- // Time of inactivity after which the broker will discard the deduplication information
- // relative to a disconnected producer. Default is 6 hours.
+ @FieldContext(
+ category = CATEGORY_POLICIES,
+ doc = "Time of inactivity after which the broker will discard the deduplication information"
+ + " relative to a disconnected producer. Default is 6 hours.")
private int brokerDeduplicationProducerInactivityTimeoutMinutes = 360;
- // When a namespace is created without specifying the number of bundle, this
- // value will be used as the default
+ @FieldContext(
+ category = CATEGORY_POLICIES,
+ doc = "When a namespace is created without specifying the number of bundle, this"
+ + " value will be used as the default")
private int defaultNumberOfNamespaceBundles = 4;
- // Enable check for minimum allowed client library version
- @FieldContext(dynamic = true)
+ @FieldContext(
+ category = CATEGORY_SERVER,
+ dynamic = true,
+ doc = "Enable check for minimum allowed client library version"
+ )
private boolean clientLibraryVersionCheckEnabled = false;
- // Path for the file used to determine the rotation status for the broker
- // when responding to service discovery health checks
+ @FieldContext(
+ category = CATEGORY_SERVER,
+ doc = "Path for the file used to determine the rotation status for the broker"
+ + " when responding to service discovery health checks")
private String statusFilePath;
- // Max number of unacknowledged messages allowed to receive messages by a consumer on a shared subscription. Broker
- // will stop sending messages to consumer once, this limit reaches until consumer starts acknowledging messages back
- // and unack count reaches to maxUnackedMessagesPerConsumer/2 Using a value of 0, is disabling unackedMessage-limit
- // check and consumer can receive messages without any restriction
+
+ @FieldContext(
+ category = CATEGORY_POLICIES,
+ doc = "Max number of unacknowledged messages allowed to receive messages by a consumer on"
+ + " a shared subscription.\n\n Broker will stop sending messages to consumer once,"
+ + " this limit reaches until consumer starts acknowledging messages back and unack count"
+ + " reaches to `maxUnackedMessagesPerConsumer/2`. Using a value of 0, it is disabling "
+ + " unackedMessage-limit check and consumer can receive messages without any restriction")
private int maxUnackedMessagesPerConsumer = 50000;
- // Max number of unacknowledged messages allowed per shared subscription. Broker will stop dispatching messages to
- // all consumers of the subscription once this limit reaches until consumer starts acknowledging messages back and
- // unack count reaches to limit/2. Using a value of 0, is disabling unackedMessage-limit
- // check and dispatcher can dispatch messages without any restriction
+ @FieldContext(
+ category = CATEGORY_POLICIES,
+ doc = "Max number of unacknowledged messages allowed per shared subscription. \n\n"
+ + " Broker will stop dispatching messages to all consumers of the subscription once this "
+ + " limit reaches until consumer starts acknowledging messages back and unack count reaches"
+ + " to `limit/2`. Using a value of 0, is disabling unackedMessage-limit check and dispatcher"
+ + " can dispatch messages without any restriction")
private int maxUnackedMessagesPerSubscription = 4 * 50000;
- // Max number of unacknowledged messages allowed per broker. Once this limit reaches, broker will stop dispatching
- // messages to all shared subscription which has higher number of unack messages until subscriptions start
- // acknowledging messages back and unack count reaches to limit/2. Using a value of 0, is disabling
- // unackedMessage-limit check and broker doesn't block dispatchers
+ @FieldContext(
+ category = CATEGORY_POLICIES,
+ doc = "Max number of unacknowledged messages allowed per broker. \n\n"
+ + " Once this limit reaches, broker will stop dispatching messages to all shared subscription "
+ + " which has higher number of unack messages until subscriptions start acknowledging messages "
+ + " back and unack count reaches to `limit/2`. Using a value of 0, is disabling unackedMessage-limit"
+ + " check and broker doesn't block dispatchers")
private int maxUnackedMessagesPerBroker = 0;
- // Once broker reaches maxUnackedMessagesPerBroker limit, it blocks subscriptions which has higher unacked messages
- // than this percentage limit and subscription will not receive any new messages until that subscription acks back
- // limit/2 messages
+ @FieldContext(
+ category = CATEGORY_POLICIES,
+ doc = "Once broker reaches maxUnackedMessagesPerBroker limit, it blocks subscriptions which has higher "
+ + " unacked messages than this percentage limit and subscription will not receive any new messages "
+ + " until that subscription acks back `limit/2` messages")
private double maxUnackedMessagesPerSubscriptionOnBrokerBlocked = 0.16;
- // Too many subscribe requests from a consumer can cause broker rewinding consumer cursors and loading data from bookies,
- // hence causing high network bandwidth usage
- // When the positive value is set, broker will throttle the subscribe requests for one consumer.
- // Otherwise, the throttling will be disabled. The default value of this setting is 0 - throttling is disabled.
- @FieldContext(dynamic = true)
+ @FieldContext(
+ category = CATEGORY_POLICIES,
+ dynamic = true,
+ doc = "Too many subscribe requests from a consumer can cause broker rewinding consumer cursors "
+ + " and loading data from bookies, hence causing high network bandwidth usage When the positive"
+ + " value is set, broker will throttle the subscribe requests for one consumer. Otherwise, the"
+ + " throttling will be disabled. The default value of this setting is 0 - throttling is disabled.")
private int subscribeThrottlingRatePerConsumer = 0;
- // Rate period for {subscribeThrottlingRatePerConsumer}. Default is 30s.
- @FieldContext(minValue = 1, dynamic = true)
+ @FieldContext(
+ minValue = 1,
+ dynamic = true,
+ category = CATEGORY_POLICIES,
+ doc = "Rate period for {subscribeThrottlingRatePerConsumer}. Default is 30s."
+ )
private int subscribeRatePeriodPerConsumerInSecond = 30;
- // Default number of message dispatching throttling-limit for every topic. Using a value of 0, is disabling default
- // message dispatch-throttling
- @FieldContext(dynamic = true)
+
+ @FieldContext(
+ dynamic = true,
+ category = CATEGORY_POLICIES,
+ doc = "Default number of message dispatching throttling-limit for every topic. \n\n"
+ + "Using a value of 0, is disabling default message dispatch-throttling")
private int dispatchThrottlingRatePerTopicInMsg = 0;
- // Default number of message-bytes dispatching throttling-limit for every topic. Using a value of 0, is disabling
- // default message-byte dispatch-throttling
- @FieldContext(dynamic = true)
+ @FieldContext(
+ dynamic = true,
+ category = CATEGORY_POLICIES,
+ doc = "Default number of message-bytes dispatching throttling-limit for every topic. \n\n"
+ + "Using a value of 0, is disabling default message-byte dispatch-throttling")
private long dispatchThrottlingRatePerTopicInByte = 0;
- // Default number of message dispatching throttling-limit for a subscription.
- // Using a value of 0, is disabling default message dispatch-throttling.
- @FieldContext(dynamic = true)
+ @FieldContext(
+ dynamic = true,
+ category = CATEGORY_POLICIES,
+ doc = "Default number of message dispatching throttling-limit for a subscription. \n\n"
+ + "Using a value of 0, is disabling default message dispatch-throttling.")
private int dispatchThrottlingRatePerSubscriptionInMsg = 0;
- // Default number of message-bytes dispatching throttling-limit for a subscription.
- // Using a value of 0, is disabling default message-byte dispatch-throttling.
- @FieldContext(dynamic = true)
+ @FieldContext(
+ dynamic = true,
+ category = CATEGORY_POLICIES,
+ doc = "Default number of message-bytes dispatching throttling-limit for a subscription. \n\n"
+ + "Using a value of 0, is disabling default message-byte dispatch-throttling.")
private long dispatchThrottlingRatePerSubscribeInByte = 0;
- // Default dispatch-throttling is disabled for consumers which already caught-up with published messages and
- // don't have backlog. This enables dispatch-throttling for non-backlog consumers as well.
- @FieldContext(dynamic = true)
+
+ @FieldContext(
+ dynamic = true,
+ category = CATEGORY_POLICIES,
+ doc = "Default dispatch-throttling is disabled for consumers which already caught-up with"
+ + " published messages and don't have backlog. This enables dispatch-throttling for "
+ + " non-backlog consumers as well.")
private boolean dispatchThrottlingOnNonBacklogConsumerEnabled = false;
- // Max number of concurrent lookup request broker allows to throttle heavy incoming lookup traffic
- @FieldContext(dynamic = true)
+ @FieldContext(
+ dynamic = true,
+ category = CATEGORY_SERVER,
+ doc = "Max number of concurrent lookup request broker allows to throttle heavy incoming lookup traffic")
private int maxConcurrentLookupRequest = 50000;
- // Max number of concurrent topic loading request broker allows to control number of zk-operations
- @FieldContext(dynamic = true)
+
+ @FieldContext(
+ dynamic = true,
+ category = CATEGORY_SERVER,
+ doc = "Max number of concurrent topic loading request broker allows to control number of zk-operations"
+ )
private int maxConcurrentTopicLoadRequest = 5000;
- // Max concurrent non-persistent message can be processed per connection
+ @FieldContext(
+ category = CATEGORY_SERVER,
+ doc = "Max concurrent non-persistent message can be processed per connection")
private int maxConcurrentNonPersistentMessagePerConnection = 1000;
- // Number of worker threads to serve non-persistent topic
+ @FieldContext(
+ category = CATEGORY_SERVER,
+ doc = "Number of worker threads to serve non-persistent topic")
private int numWorkerThreadsForNonPersistentTopic = Runtime.getRuntime().availableProcessors();;
- // Enable broker to load persistent topics
+ @FieldContext(
+ category = CATEGORY_SERVER,
+ doc = "Enable broker to load persistent topics"
+ )
private boolean enablePersistentTopics = true;
- // Enable broker to load non-persistent topics
+ @FieldContext(
+ category = CATEGORY_SERVER,
+ doc = "Enable broker to load non-persistent topics"
+ )
private boolean enableNonPersistentTopics = true;
- // Enable to run bookie along with broker
+ @FieldContext(
+ category = CATEGORY_SERVER,
+ doc = "Enable to run bookie along with broker"
+ )
private boolean enableRunBookieTogether = false;
- // Enable to run bookie autorecovery along with broker
+ @FieldContext(
+ category = CATEGORY_SERVER,
+ doc = "Enable to run bookie autorecovery along with broker"
+ )
private boolean enableRunBookieAutoRecoveryTogether = false;
- // Max number of producers allowed to connect to topic. Once this limit reaches, Broker will reject new producers
- // until the number of connected producers decrease.
- // Using a value of 0, is disabling maxProducersPerTopic-limit check.
+ @FieldContext(
+ category = CATEGORY_SERVER,
+ doc = "Max number of producers allowed to connect to topic. \n\nOnce this limit reaches,"
+ + " Broker will reject new producers until the number of connected producers decrease."
+ + " Using a value of 0, is disabling maxProducersPerTopic-limit check.")
private int maxProducersPerTopic = 0;
- // Max number of consumers allowed to connect to topic. Once this limit reaches, Broker will reject new consumers
- // until the number of connected consumers decrease.
- // Using a value of 0, is disabling maxConsumersPerTopic-limit check.
+ @FieldContext(
+ category = CATEGORY_SERVER,
+ doc = "Max number of consumers allowed to connect to topic. \n\nOnce this limit reaches,"
+ + " Broker will reject new consumers until the number of connected consumers decrease."
+ + " Using a value of 0, is disabling maxConsumersPerTopic-limit check.")
private int maxConsumersPerTopic = 0;
- // Max number of consumers allowed to connect to subscription. Once this limit reaches, Broker will reject new consumers
- // until the number of connected consumers decrease.
- // Using a value of 0, is disabling maxConsumersPerSubscription-limit check.
+ @FieldContext(
+ category = CATEGORY_SERVER,
+ doc = "Max number of consumers allowed to connect to subscription. \n\nOnce this limit reaches,"
+ + " Broker will reject new consumers until the number of connected consumers decrease."
+ + " Using a value of 0, is disabling maxConsumersPerSubscription-limit check.")
private int maxConsumersPerSubscription = 0;
/***** --- TLS --- ****/
+ @FieldContext(
+ category = CATEGORY_TLS,
+ doc = "Enable TLS"
+ )
@Deprecated
private boolean tlsEnabled = false;
- // Path for the TLS certificate file
+ @FieldContext(
+ category = CATEGORY_TLS,
+ doc = "Path for the TLS certificate file"
+ )
private String tlsCertificateFilePath;
- // Path for the TLS private key file
+ @FieldContext(
+ category = CATEGORY_TLS,
+ doc = "Path for the TLS private key file"
+ )
private String tlsKeyFilePath;
- // Path for the trusted TLS certificate file
+ @FieldContext(
+ category = CATEGORY_TLS,
+ doc = "Path for the trusted TLS certificate file"
+ )
private String tlsTrustCertsFilePath = "";
- // Accept untrusted TLS certificate from client
+ @FieldContext(
+ category = CATEGORY_TLS,
+ doc = "Accept untrusted TLS certificate from client"
+ )
private boolean tlsAllowInsecureConnection = false;
- // Specify the tls protocols the broker will use to negotiate during TLS Handshake.
- // Example:- [TLSv1.2, TLSv1.1, TLSv1]
+ @FieldContext(
+ category = CATEGORY_TLS,
+ doc = "Specify the tls protocols the broker will use to negotiate during TLS Handshake.\n\n"
+ + "Example:- [TLSv1.2, TLSv1.1, TLSv1]"
+ )
private Set<String> tlsProtocols = Sets.newTreeSet();
- // Specify the tls cipher the broker will use to negotiate during TLS Handshake.
- // Example:- [TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256]
+ @FieldContext(
+ category = CATEGORY_TLS,
+ doc = "Specify the tls cipher the broker will use to negotiate during TLS Handshake.\n\n"
+ + "Example:- [TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256]"
+ )
private Set<String> tlsCiphers = Sets.newTreeSet();
- // Specify whether Client certificates are required for TLS
- // Reject the Connection if the Client Certificate is not trusted.
+ @FieldContext(
+ category = CATEGORY_TLS,
+ doc = "Specify whether Client certificates are required for TLS Reject.\n"
+ + "the Connection if the Client Certificate is not trusted")
private boolean tlsRequireTrustedClientCertOnConnect = false;
/***** --- Authentication --- ****/
- // Enable authentication
+ @FieldContext(
+ category = CATEGORY_AUTHENTICATION,
+ doc = "Enable authentication"
+ )
private boolean authenticationEnabled = false;
- // Autentication provider name list, which is a list of class names
+ @FieldContext(
+ category = CATEGORY_AUTHENTICATION,
+ doc = "Autentication provider name list, which is a list of class names"
+ )
private Set<String> authenticationProviders = Sets.newTreeSet();
- // Enforce authorization
+ @FieldContext(
+ category = CATEGORY_AUTHORIZATION,
+ doc = "Enforce authorization"
+ )
private boolean authorizationEnabled = false;
- // Authorization provider fully qualified class-name
+ @FieldContext(
+ category = CATEGORY_AUTHORIZATION,
+ doc = "Authorization provider fully qualified class-name"
+ )
private String authorizationProvider = PulsarAuthorizationProvider.class.getName();
- // Role names that are treated as "super-user", meaning they will be able to
- // do all admin operations and publish/consume from all topics
+ @FieldContext(
+ category = CATEGORY_AUTHORIZATION,
+ doc = "Role names that are treated as `super-user`, meaning they will be able to"
+ + " do all admin operations and publish/consume from all topics"
+ )
private Set<String> superUserRoles = Sets.newTreeSet();
- // Role names that are treated as "proxy roles". If the broker sees a request with
- // role as proxyRoles - it will demand to see the original client role or certificate.
+ @FieldContext(
+ category = CATEGORY_AUTHORIZATION,
+ doc = "Role names that are treated as `proxy roles`. \n\nIf the broker sees"
+ + " a request with role as proxyRoles - it will demand to see the original"
+ + " client role or certificate.")
private Set<String> proxyRoles = Sets.newTreeSet();
- // If this flag is set then the broker authenticates the original Auth data
- // else it just accepts the originalPrincipal and authorizes it (if required).
+ @FieldContext(
+ category = CATEGORY_AUTHORIZATION,
+ doc = "If this flag is set then the broker authenticates the original Auth data"
+ + " else it just accepts the originalPrincipal and authorizes it (if required)")
private boolean authenticateOriginalAuthData = false;
- // Allow wildcard matching in authorization
- // (wildcard matching only applicable if wildcard-char:
- // * presents at first or last position eg: *.pulsar.service, pulsar.service.*)
+ @FieldContext(
+ category = CATEGORY_AUTHORIZATION,
+ doc = "Allow wildcard matching in authorization\n\n"
+ + "(wildcard matching only applicable if wildcard-char: * presents at first"
+ + " or last position eg: *.pulsar.service, pulsar.service.*)")
private boolean authorizationAllowWildcardsMatching = false;
- // Authentication settings of the broker itself. Used when the broker connects
- // to other brokers, either in same or other clusters. Default uses plugin which disables authentication
+ @FieldContext(
+ category = CATEGORY_AUTHENTICATION,
+ doc = "Authentication settings of the broker itself. \n\nUsed when the broker connects"
+ + " to other brokers, either in same or other clusters. Default uses plugin which disables authentication"
+ )
private String brokerClientAuthenticationPlugin = "org.apache.pulsar.client.impl.auth.AuthenticationDisabled";
+ @FieldContext(
+ category = CATEGORY_AUTHENTICATION,
+ doc = "Authentication parameters of the authentication plugin the broker is using to connect to other brokers"
+ )
private String brokerClientAuthenticationParameters = "";
- // Path for the trusted TLS certificate file for outgoing connection to a server (broker)
+ @FieldContext(
+ category = CATEGORY_AUTHENTICATION,
+ doc = "Path for the trusted TLS certificate file for outgoing connection to a server (broker)")
private String brokerClientTrustCertsFilePath = "";
- // When this parameter is not empty, unauthenticated users perform as anonymousUserRole
+ @FieldContext(
+ category = CATEGORY_AUTHORIZATION,
+ doc = "When this parameter is not empty, unauthenticated users perform as anonymousUserRole"
+ )
private String anonymousUserRole = null;
/**** --- BookKeeper Client --- ****/
- // Authentication plugin to use when connecting to bookies
+ @FieldContext(
+ category = CATEGORY_STORAGE_BK,
+ doc = "Authentication plugin to use when connecting to bookies"
+ )
private String bookkeeperClientAuthenticationPlugin;
- // BookKeeper auth plugin implementatation specifics parameters name and values
+ @FieldContext(
+ category = CATEGORY_STORAGE_BK,
+ doc = "BookKeeper auth plugin implementatation specifics parameters name and values"
+ )
private String bookkeeperClientAuthenticationParametersName;
+ @FieldContext(
+ category = CATEGORY_STORAGE_BK,
+ doc = "Parameters for bookkeeper auth plugin"
+ )
private String bookkeeperClientAuthenticationParameters;
- // Timeout for BK add / read operations
+ @FieldContext(
+ category = CATEGORY_STORAGE_BK,
+ doc = "Timeout for BK add / read operations"
+ )
private long bookkeeperClientTimeoutInSeconds = 30;
- // Speculative reads are initiated if a read request doesn't complete within
- // a certain time Using a value of 0, is disabling the speculative reads
+ @FieldContext(
+ category = CATEGORY_STORAGE_BK,
+ doc = "Speculative reads are initiated if a read request doesn't complete within"
+ + " a certain time Using a value of 0, is disabling the speculative reads")
private int bookkeeperClientSpeculativeReadTimeoutInMillis = 0;
- // Use older Bookkeeper wire protocol with bookie
+ @FieldContext(
+ category = CATEGORY_STORAGE_BK,
+ doc = "Use older Bookkeeper wire protocol with bookie"
+ )
private boolean bookkeeperUseV2WireProtocol = true;
- // Enable bookies health check. Bookies that have more than the configured
- // number of failure within the interval will be quarantined for some time.
- // During this period, new ledgers won't be created on these bookies
+ @FieldContext(
+ category = CATEGORY_STORAGE_BK,
+ doc = "Enable bookies health check. \n\n Bookies that have more than the configured"
+ + " number of failure within the interval will be quarantined for some time."
+ + " During this period, new ledgers won't be created on these bookies")
private boolean bookkeeperClientHealthCheckEnabled = true;
+ @FieldContext(
+ category = CATEGORY_STORAGE_BK,
+ doc = "Bookies health check interval in seconds"
+ )
private long bookkeeperClientHealthCheckIntervalSeconds = 60;
+ @FieldContext(
+ category = CATEGORY_STORAGE_BK,
+ doc = "Bookies health check error threshold per check interval"
+ )
private long bookkeeperClientHealthCheckErrorThresholdPerInterval = 5;
+ @FieldContext(
+ category = CATEGORY_STORAGE_BK,
+ doc = "Bookie health check quarantined time in seconds"
+ )
private long bookkeeperClientHealthCheckQuarantineTimeInSeconds = 1800;
- // Enable rack-aware bookie selection policy. BK will chose bookies from
- // different racks when forming a new bookie ensemble
+ @FieldContext(
+ category = CATEGORY_STORAGE_BK,
+ doc = "Enable rack-aware bookie selection policy. \n\nBK will chose bookies from"
+ + " different racks when forming a new bookie ensemble")
private boolean bookkeeperClientRackawarePolicyEnabled = true;
- // Enable region-aware bookie selection policy. BK will chose bookies from
- // different regions and racks when forming a new bookie ensemble
+ @FieldContext(
+ category = CATEGORY_STORAGE_BK,
+ doc = "Enable region-aware bookie selection policy. \n\nBK will chose bookies from"
+ + " different regions and racks when forming a new bookie ensemble")
private boolean bookkeeperClientRegionawarePolicyEnabled = false;
- // Enable/disable reordering read sequence on reading entries.
+ @FieldContext(
+ category = CATEGORY_STORAGE_BK,
+ doc = "Enable/disable reordering read sequence on reading entries")
private boolean bookkeeperClientReorderReadSequenceEnabled = false;
- // Enable bookie isolation by specifying a list of bookie groups to choose
- // from. Any bookie outside the specified groups will not be used by the
- // broker
- @FieldContext(required = false)
+ @FieldContext(
+ category = CATEGORY_STORAGE_BK,
+ required = false,
+ doc = "Enable bookie isolation by specifying a list of bookie groups to choose from. \n\n"
+ + "Any bookie outside the specified groups will not be used by the broker")
private String bookkeeperClientIsolationGroups;
/**** --- Managed Ledger --- ****/
- // Number of bookies to use when creating a ledger
- @FieldContext(minValue = 1)
+ @FieldContext(
+ minValue = 1,
+ category = CATEGORY_STORAGE_ML,
+ doc = "Number of bookies to use when creating a ledger"
+ )
private int managedLedgerDefaultEnsembleSize = 1;
- // Number of copies to store for each message
- @FieldContext(minValue = 1)
+ @FieldContext(
+ minValue = 1,
+ category = CATEGORY_STORAGE_ML,
+ doc = "Number of copies to store for each message"
+ )
private int managedLedgerDefaultWriteQuorum = 1;
- // Number of guaranteed copies (acks to wait before write is complete)
- @FieldContext(minValue = 1)
+ @FieldContext(
+ minValue = 1,
+ category = CATEGORY_STORAGE_ML,
+ doc = "Number of guaranteed copies (acks to wait before write is complete)"
+ )
private int managedLedgerDefaultAckQuorum = 1;
- // Default type of checksum to use when writing to BookKeeper. Default is "CRC32C"
- // Other possible options are "CRC32", "MAC" or "DUMMY" (no checksum).
+ //
+ //
+ @FieldContext(
+ category = CATEGORY_STORAGE_ML,
+ doc = "Default type of checksum to use when writing to BookKeeper. \n\nDefault is `CRC32C`."
+ + " Other possible options are `CRC32`, `MAC` or `DUMMY` (no checksum)."
+ )
private DigestType managedLedgerDigestType = DigestType.CRC32C;
- // Max number of bookies to use when creating a ledger
- @FieldContext(minValue = 1)
+ @FieldContext(
+ minValue = 1,
+ category = CATEGORY_STORAGE_ML,
+ doc = "Max number of bookies to use when creating a ledger"
+ )
private int managedLedgerMaxEnsembleSize = 5;
- // Max number of copies to store for each message
- @FieldContext(minValue = 1)
+ @FieldContext(
+ minValue = 1,
+ category = CATEGORY_STORAGE_ML,
+ doc = "Max number of copies to store for each message"
+ )
private int managedLedgerMaxWriteQuorum = 5;
- // Max number of guaranteed copies (acks to wait before write is complete)
- @FieldContext(minValue = 1)
+ @FieldContext(
+ minValue = 1,
+ category = CATEGORY_STORAGE_ML,
+ doc = "Max number of guaranteed copies (acks to wait before write is complete)"
+ )
private int managedLedgerMaxAckQuorum = 5;
- // Amount of memory to use for caching data payload in managed ledger. This
- // memory
- // is allocated from JVM direct memory and it's shared across all the topics
- // running in the same broker
+ @FieldContext(
+ category = CATEGORY_STORAGE_ML,
+ doc = "Amount of memory to use for caching data payload in managed ledger. \n\nThis"
+ + " memory is allocated from JVM direct memory and it's shared across all the topics"
+ + " running in the same broker")
private int managedLedgerCacheSizeMB = 1024;
- // Threshold to which bring down the cache level when eviction is triggered
+ @FieldContext(
+ category = CATEGORY_STORAGE_ML,
+ doc = "Threshold to which bring down the cache level when eviction is triggered"
+ )
private double managedLedgerCacheEvictionWatermark = 0.9f;
- // Rate limit the amount of writes per second generated by consumer acking the messages
+ @FieldContext(
+ category = CATEGORY_STORAGE_ML,
+ doc = "Rate limit the amount of writes per second generated by consumer acking the messages"
+ )
private double managedLedgerDefaultMarkDeleteRateLimit = 1.0;
- // Number of threads to be used for managed ledger tasks dispatching
+ @FieldContext(
+ category = CATEGORY_STORAGE_ML,
+ doc = "Number of threads to be used for managed ledger tasks dispatching"
+ )
private int managedLedgerNumWorkerThreads = Runtime.getRuntime().availableProcessors();
- // Number of threads to be used for managed ledger scheduled tasks
+ @FieldContext(
+ category = CATEGORY_STORAGE_ML,
+ doc = "Number of threads to be used for managed ledger scheduled tasks"
+ )
private int managedLedgerNumSchedulerThreads = Runtime.getRuntime().availableProcessors();
- // Max number of entries to append to a ledger before triggering a rollover
- // A ledger rollover is triggered on these conditions Either the max
- // rollover time has been reached or max entries have been written to the
- // ledged and at least min-time has passed
+ @FieldContext(
+ category = CATEGORY_STORAGE_ML,
+ doc = "Max number of entries to append to a ledger before triggering a rollover.\n\n"
+ + "A ledger rollover is triggered on these conditions Either the max"
+ + " rollover time has been reached or max entries have been written to the"
+ + " ledged and at least min-time has passed")
private int managedLedgerMaxEntriesPerLedger = 50000;
- // Minimum time between ledger rollover for a topic
+ @FieldContext(
+ category = CATEGORY_STORAGE_ML,
+ doc = "Minimum time between ledger rollover for a topic"
+ )
private int managedLedgerMinLedgerRolloverTimeMinutes = 10;
- // Maximum time before forcing a ledger rollover for a topic
+ @FieldContext(
+ category = CATEGORY_STORAGE_ML,
+ doc = "Maximum time before forcing a ledger rollover for a topic"
+ )
private int managedLedgerMaxLedgerRolloverTimeMinutes = 240;
- // Delay between a ledger being successfully offloaded to long term storage
- // and the ledger being deleted from bookkeeper
+ @FieldContext(
+ category = CATEGORY_STORAGE_OFFLOADING,
+ doc = "Delay between a ledger being successfully offloaded to long term storage,"
+ + " and the ledger being deleted from bookkeeper"
+ )
private long managedLedgerOffloadDeletionLagMs = TimeUnit.HOURS.toMillis(4);
- // Max number of entries to append to a cursor ledger
+ @FieldContext(
+ category = CATEGORY_STORAGE_ML,
+ doc = "Max number of entries to append to a cursor ledger"
+ )
private int managedLedgerCursorMaxEntriesPerLedger = 50000;
- // Max time before triggering a rollover on a cursor ledger
+ @FieldContext(
+ category = CATEGORY_STORAGE_ML,
+ doc = "Max time before triggering a rollover on a cursor ledger"
+ )
private int managedLedgerCursorRolloverTimeInSeconds = 14400;
- // Max number of "acknowledgment holes" that are going to be persistently stored.
- // When acknowledging out of order, a consumer will leave holes that are supposed
- // to be quickly filled by acking all the messages. The information of which
- // messages are acknowledged is persisted by compressing in "ranges" of messages
- // that were acknowledged. After the max number of ranges is reached, the information
- // will only be tracked in memory and messages will be redelivered in case of
- // crashes.
+ @FieldContext(
+ category = CATEGORY_STORAGE_ML,
+ doc = "Max number of `acknowledgment holes` that are going to be persistently stored.\n\n"
+ + "When acknowledging out of order, a consumer will leave holes that are supposed"
+ + " to be quickly filled by acking all the messages. The information of which"
+ + " messages are acknowledged is persisted by compressing in `ranges` of messages"
+ + " that were acknowledged. After the max number of ranges is reached, the information"
+ + " will only be tracked in memory and messages will be redelivered in case of"
+ + " crashes.")
private int managedLedgerMaxUnackedRangesToPersist = 10000;
- // Max number of "acknowledgment holes" that can be stored in Zookeeper. If number of unack message range is higher
- // than this limit then broker will persist unacked ranges into bookkeeper to avoid additional data overhead into
- // zookeeper.
+ @FieldContext(
+ category = CATEGORY_STORAGE_ML,
+ doc = "Max number of `acknowledgment holes` that can be stored in Zookeeper.\n\n"
+ + "If number of unack message range is higher than this limit then broker will persist"
+ + " unacked ranges into bookkeeper to avoid additional data overhead into zookeeper.")
private int managedLedgerMaxUnackedRangesToPersistInZooKeeper = 1000;
- // Skip reading non-recoverable/unreadable data-ledger under managed-ledger's list. It helps when data-ledgers gets
- // corrupted at bookkeeper and managed-cursor is stuck at that ledger.
- @FieldContext(dynamic = true)
+ @FieldContext(
+ dynamic = true,
+ category = CATEGORY_STORAGE_ML,
+ doc = "Skip reading non-recoverable/unreadable data-ledger under managed-ledger's list.\n\n"
+ + " It helps when data-ledgers gets corrupted at bookkeeper and managed-cursor is stuck at that ledger."
+ )
private boolean autoSkipNonRecoverableData = false;
- // operation timeout while updating managed-ledger metadata.
+ @FieldContext(
+ category = CATEGORY_STORAGE_ML,
+ doc = "operation timeout while updating managed-ledger metadata."
+ )
private long managedLedgerMetadataOperationsTimeoutSeconds = 60;
/*** --- Load balancer --- ****/
- // Enable load balancer
+ @FieldContext(
+ category = CATEGORY_LOAD_BALANCER,
+ doc = "Enable load balancer"
+ )
private boolean loadBalancerEnabled = true;
- // load placement strategy[weightedRandomSelection/leastLoadedServer] (only used by SimpleLoadManagerImpl)
@Deprecated
+ @FieldContext(
+ category = CATEGORY_LOAD_BALANCER,
+ deprecated = true,
+ doc = "load placement strategy[weightedRandomSelection/leastLoadedServer] (only used by SimpleLoadManagerImpl)"
+ )
private String loadBalancerPlacementStrategy = "leastLoadedServer"; // weighted random selection
- // Percentage of change to trigger load report update
- @FieldContext(dynamic = true)
+ @FieldContext(
+ dynamic = true,
+ category = CATEGORY_LOAD_BALANCER,
+ doc = "Percentage of change to trigger load report update"
+ )
private int loadBalancerReportUpdateThresholdPercentage = 10;
- // maximum interval to update load report
- @FieldContext(dynamic = true)
+ @FieldContext(
+ dynamic = true,
+ category = CATEGORY_LOAD_BALANCER,
+ doc = "maximum interval to update load report"
+ )
private int loadBalancerReportUpdateMaxIntervalMinutes = 15;
- // Frequency of report to collect
+ @FieldContext(
+ category = CATEGORY_LOAD_BALANCER,
+ doc = "Frequency of report to collect, in minutes"
+ )
private int loadBalancerHostUsageCheckIntervalMinutes = 1;
- // Enable/disable automatic bundle unloading for load-shedding
- @FieldContext(dynamic = true)
+ @FieldContext(
+ dynamic = true,
+ category = CATEGORY_LOAD_BALANCER,
+ doc = "Enable/disable automatic bundle unloading for load-shedding"
+ )
private boolean loadBalancerSheddingEnabled = true;
- // Load shedding interval. Broker periodically checks whether some traffic should be offload from some over-loaded
- // broker to other under-loaded brokers
+ @FieldContext(
+ category = CATEGORY_LOAD_BALANCER,
+ doc = "Load shedding interval. \n\nBroker periodically checks whether some traffic"
+ + " should be offload from some over-loaded broker to other under-loaded brokers"
+ )
private int loadBalancerSheddingIntervalMinutes = 1;
- // Prevent the same topics to be shed and moved to other broker more that
- // once within this timeframe
+ @FieldContext(
+ category = CATEGORY_LOAD_BALANCER,
+ doc = "Prevent the same topics to be shed and moved to other broker more that"
+ + " once within this timeframe"
+ )
private long loadBalancerSheddingGracePeriodMinutes = 30;
- // Usage threshold to determine a broker as under-loaded (only used by SimpleLoadManagerImpl)
+ @FieldContext(
+ category = CATEGORY_LOAD_BALANCER,
+ deprecated = true,
+ doc = "Usage threshold to determine a broker as under-loaded (only used by SimpleLoadManagerImpl)"
+ )
@Deprecated
private int loadBalancerBrokerUnderloadedThresholdPercentage = 50;
- // Usage threshold to allocate max number of topics to broker
- @FieldContext(dynamic = true)
+ @FieldContext(
+ dynamic = true,
+ category = CATEGORY_LOAD_BALANCER,
+ doc = "Usage threshold to allocate max number of topics to broker"
+ )
private int loadBalancerBrokerMaxTopics = 50000;
- // Usage threshold to determine a broker as over-loaded
- @FieldContext(dynamic = true)
+ @FieldContext(
+ dynamic = true,
+ category = CATEGORY_LOAD_BALANCER,
+ doc = "Usage threshold to determine a broker as over-loaded"
+ )
private int loadBalancerBrokerOverloadedThresholdPercentage = 85;
- // Interval to flush dynamic resource quota to ZooKeeper
+ @FieldContext(
+ category = CATEGORY_LOAD_BALANCER,
+ doc = "Interval to flush dynamic resource quota to ZooKeeper"
+ )
private int loadBalancerResourceQuotaUpdateIntervalMinutes = 15;
- // Usage threshold to determine a broker is having just right level of load (only used by SimpleLoadManagerImpl)
@Deprecated
+ @FieldContext(
+ category = CATEGORY_LOAD_BALANCER,
+ deprecated = true,
+ doc = "Usage threshold to determine a broker is having just right level of load"
+ + " (only used by SimpleLoadManagerImpl)"
+ )
private int loadBalancerBrokerComfortLoadLevelPercentage = 65;
- // enable/disable automatic namespace bundle split
- @FieldContext(dynamic = true)
+ @FieldContext(
+ dynamic = true,
+ category = CATEGORY_LOAD_BALANCER,
+ doc = "enable/disable automatic namespace bundle split"
+ )
private boolean loadBalancerAutoBundleSplitEnabled = true;
- // enable/disable automatic unloading of split bundles
- @FieldContext(dynamic = true)
+ @FieldContext(
+ dynamic = true,
+ category = CATEGORY_LOAD_BALANCER,
+ doc = "enable/disable automatic unloading of split bundles"
+ )
private boolean loadBalancerAutoUnloadSplitBundlesEnabled = true;
- // maximum topics in a bundle, otherwise bundle split will be triggered
+ @FieldContext(
+ category = CATEGORY_LOAD_BALANCER,
+ doc = "maximum topics in a bundle, otherwise bundle split will be triggered"
+ )
private int loadBalancerNamespaceBundleMaxTopics = 1000;
- // maximum sessions (producers + consumers) in a bundle, otherwise bundle split will be triggered
+ @FieldContext(
+ category = CATEGORY_LOAD_BALANCER,
+ doc = "maximum sessions (producers + consumers) in a bundle, otherwise bundle split will be triggered"
+ )
private int loadBalancerNamespaceBundleMaxSessions = 1000;
- // maximum msgRate (in + out) in a bundle, otherwise bundle split will be triggered
+ @FieldContext(
+ category = CATEGORY_LOAD_BALANCER,
+ doc = "maximum msgRate (in + out) in a bundle, otherwise bundle split will be triggered"
+ )
private int loadBalancerNamespaceBundleMaxMsgRate = 30000;
- // maximum bandwidth (in + out) in a bundle, otherwise bundle split will be triggered
+ @FieldContext(
+ category = CATEGORY_LOAD_BALANCER,
+ doc = "maximum bandwidth (in + out) in a bundle, otherwise bundle split will be triggered"
+ )
private int loadBalancerNamespaceBundleMaxBandwidthMbytes = 100;
- // maximum number of bundles in a namespace
+ @FieldContext(
+ category = CATEGORY_LOAD_BALANCER,
+ doc = "maximum number of bundles in a namespace"
+ )
private int loadBalancerNamespaceMaximumBundles = 128;
- // Name of load manager to use
- @FieldContext(dynamic = true)
+ @FieldContext(
+ dynamic = true,
+ category = CATEGORY_LOAD_BALANCER,
+ doc = "Name of load manager to use"
+ )
private String loadManagerClassName = "org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl";
- // Option to override the auto-detected network interfaces max speed
+ @FieldContext(
+ category = CATEGORY_LOAD_BALANCER,
+ doc = "Option to override the auto-detected network interfaces max speed"
+ )
private Double loadBalancerOverrideBrokerNicSpeedGbps;
/**** --- Replication --- ****/
- // Enable replication metrics
+ @FieldContext(
+ category = CATEGORY_REPLICATION,
+ doc = "Enable replication metrics"
+ )
private boolean replicationMetricsEnabled = false;
- // Max number of connections to open for each broker in a remote cluster
- // More connections host-to-host lead to better throughput over high-latency
- // links.
+ @FieldContext(
+ category = CATEGORY_REPLICATION,
+ doc = "Max number of connections to open for each broker in a remote cluster.\n\n"
+ + "More connections host-to-host lead to better throughput over high-latency links"
+ )
private int replicationConnectionsPerBroker = 16;
- @FieldContext(required = false)
- // replicator prefix used for replicator producer name and cursor name
+ @FieldContext(
+ required = false,
+ category = CATEGORY_REPLICATION,
+ doc = "replicator prefix used for replicator producer name and cursor name"
+ )
private String replicatorPrefix = "pulsar.repl";
- // Replicator producer queue size;
+ @FieldContext(
+ category = CATEGORY_REPLICATION,
+ doc = "Replicator producer queue size"
+ )
private int replicationProducerQueueSize = 1000;
- // @deprecated - Use brokerClientTlsEnabled instead.
@Deprecated
+ @FieldContext(
+ category = CATEGORY_REPLICATION,
+ deprecated = true,
+ doc = "@deprecated - Use brokerClientTlsEnabled instead."
+ )
private boolean replicationTlsEnabled = false;
- // Enable TLS when talking with other brokers in the same cluster (admin operation) or different clusters (replication)
+ @FieldContext(
+ category = CATEGORY_REPLICATION,
+ doc = "Enable TLS when talking with other brokers in the same cluster (admin operation)"
+ + " or different clusters (replication)"
+ )
private boolean brokerClientTlsEnabled = false;
- // Default message retention time
+ @FieldContext(
+ category = CATEGORY_POLICIES,
+ doc = "Default message retention time"
+ )
private int defaultRetentionTimeInMinutes = 0;
- // Default retention size
+ @FieldContext(
+ category = CATEGORY_POLICIES,
+ doc = "Default retention size"
+ )
private int defaultRetentionSizeInMB = 0;
- // How often to check pulsar connection is still alive
+ @FieldContext(
+ category = CATEGORY_SERVER,
+ doc = "How often to check pulsar connection is still alive"
+ )
private int keepAliveIntervalSeconds = 30;
- // How often broker checks for inactive topics to be deleted (topics with no subscriptions and no one connected)
+ @FieldContext(
+ category = CATEGORY_POLICIES,
+ doc = "How often broker checks for inactive topics to be deleted (topics with no subscriptions and no one connected)"
+ )
private int brokerServicePurgeInactiveFrequencyInSeconds = 60;
+ @FieldContext(
+ category = CATEGORY_SERVER,
+ doc = "A comma-separated list of namespaces to bootstrap"
+ )
private List<String> bootstrapNamespaces = new ArrayList<String>();
private Properties properties = new Properties();
- // If true, (and ModularLoadManagerImpl is being used), the load manager will attempt to
- // use only brokers running the latest software version (to minimize impact to bundles)
- @FieldContext(dynamic = true)
+ @FieldContext(
+ dynamic = true,
+ category = CATEGORY_SERVER,
+ doc = "If true, (and ModularLoadManagerImpl is being used), the load manager will attempt to "
+ + "use only brokers running the latest software version (to minimize impact to bundles)"
+ )
private boolean preferLaterVersions = false;
- // Interval between checks to see if topics with compaction policies need to be compacted
+ @FieldContext(
+ category = CATEGORY_SERVER,
+ doc = "Interval between checks to see if topics with compaction policies need to be compacted"
+ )
private int brokerServiceCompactionMonitorIntervalInSeconds = 60;
+ @FieldContext(
+ category = CATEGORY_SCHEMA,
+ doc = "Enforce schema validation on following cases:\n\n"
+ + " - if a producer without a schema attempts to produce to a topic with schema, the producer will be\n"
+ + " failed to connect. PLEASE be carefully on using this, since non-java clients don't support schema.\n"
+ + " if you enable this setting, it will cause non-java clients failed to produce."
+ )
private boolean isSchemaValidationEnforced = false;
+ @FieldContext(
+ category = CATEGORY_SCHEMA,
+ doc = "The schema storage implementation used by this broker"
+ )
private String schemaRegistryStorageClassName = "org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorageFactory";
+ @FieldContext(
+ category = CATEGORY_SCHEMA,
+ doc = "The list compatibility checkers to be used in schema registry"
+ )
private Set<String> schemaRegistryCompatibilityCheckers = Sets.newHashSet(
"org.apache.pulsar.broker.service.schema.JsonSchemaCompatibilityCheck",
"org.apache.pulsar.broker.service.schema.AvroSchemaCompatibilityCheck"
);
/**** --- WebSocket --- ****/
- // Number of IO threads in Pulsar Client used in WebSocket proxy
+ @FieldContext(
+ category = CATEGORY_WEBSOCKET,
+ doc = "Number of IO threads in Pulsar Client used in WebSocket proxy"
+ )
private int webSocketNumIoThreads = Runtime.getRuntime().availableProcessors();
- // Number of connections per Broker in Pulsar Client used in WebSocket proxy
+ @FieldContext(
+ category = CATEGORY_WEBSOCKET,
+ doc = "Number of connections per Broker in Pulsar Client used in WebSocket proxy"
+ )
private int webSocketConnectionsPerBroker = Runtime.getRuntime().availableProcessors();
- // Time in milliseconds that idle WebSocket session times out
+ @FieldContext(
+ category = CATEGORY_WEBSOCKET,
+ doc = "Time in milliseconds that idle WebSocket session times out"
+ )
private int webSocketSessionIdleTimeoutMillis = 300000;
/**** --- Metrics --- ****/
- // If true, export topic level metrics otherwise namespace level
+ @FieldContext(
+ category = CATEGORY_METRICS,
+ doc = "If true, export topic level metrics otherwise namespace level"
+ )
private boolean exposeTopicLevelMetricsInPrometheus = true;
+ @FieldContext(
+ category = CATEGORY_METRICS,
+ doc = "If true, export consumer level metrics otherwise namespace level"
+ )
private boolean exposeConsumerLevelMetricsInPrometheus = false;
/**** --- Functions --- ****/
+ @FieldContext(
+ category = CATEGORY_FUNCTIONS,
+ doc = "Flag indicates enabling or disabling function worker on brokers"
+ )
private boolean functionsWorkerEnabled = false;
/**** --- Broker Web Stats --- ****/
- // If true, export publisher stats when returning topics stats from the admin rest api
+ @FieldContext(
+ category = CATEGORY_METRICS,
+ doc = "If true, export publisher stats when returning topics stats from the admin rest api"
+ )
private boolean exposePublisherStats = true;
+ @FieldContext(
+ category = CATEGORY_METRICS,
+ doc = "Stats update frequency in seconds"
+ )
private int statsUpdateFrequencyInSecs = 60;
+ @FieldContext(
+ category = CATEGORY_METRICS,
+ doc = "Stats update initial delay in seconds"
+ )
private int statsUpdateInitialDelayInSecs = 60;
/**** --- Ledger Offloading --- ****/
@@ -526,13 +1039,22 @@ public class ServiceConfiguration implements PulsarConfiguration {
* NOTES: all implementation related settings should be put in implementation package.
* only common settings like driver name, io threads can be added here.
****/
- // The directory to locate offloaders
+ @FieldContext(
+ category = CATEGORY_STORAGE_OFFLOADING,
+ doc = "The directory to locate offloaders"
+ )
private String offloadersDirectory = "./offloaders";
- // Driver to use to offload old data to long term storage
+ @FieldContext(
+ category = CATEGORY_STORAGE_OFFLOADING,
+ doc = "Driver to use to offload old data to long term storage"
+ )
private String managedLedgerOffloadDriver = null;
- // Maximum number of thread pool threads for ledger offloading
+ @FieldContext(
+ category = CATEGORY_STORAGE_OFFLOADING,
+ doc = "Maximum number of thread pool threads for ledger offloading"
+ )
private int managedLedgerOffloadMaxThreads = 2;
/**
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/PulsarConfigurationLoader.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/PulsarConfigurationLoader.java
index df0a1ec..aad9a2f 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/PulsarConfigurationLoader.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/PulsarConfigurationLoader.java
@@ -25,6 +25,7 @@ import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
import java.util.Arrays;
import java.util.Map;
import java.util.Properties;
@@ -180,8 +181,10 @@ public class PulsarConfigurationLoader {
try {
Field convertedConfField = ServiceConfiguration.class.getDeclaredField(confField.getName());
confField.setAccessible(true);
- convertedConfField.setAccessible(true);
- convertedConfField.set(convertedConf, confField.get(conf));
+ if (!Modifier.isStatic(convertedConfField.getModifiers())) {
+ convertedConfField.setAccessible(true);
+ convertedConfField.set(convertedConf, confField.get(conf));
+ }
} catch (NoSuchFieldException e) {
if (!ignoreNonExistMember) {
throw new IllegalArgumentException("Exception caused while converting configuration: " + e.getMessage());