You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by li...@apache.org on 2022/09/11 12:37:24 UTC
[pulsar] branch master updated: [feature][doc] Generate client config docs from source code (#17198)
This is an automated email from the ASF dual-hosted git repository.
liuyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 1a638aad396 [feature][doc] Generate client config docs from source code (#17198)
1a638aad396 is described below
commit 1a638aad39694134f94179b6bec503a061906143
Author: Mercurio <32...@users.noreply.github.com>
AuthorDate: Sun Sep 11 20:37:10 2022 +0800
[feature][doc] Generate client config docs from source code (#17198)
---
.../pulsar/broker/BaseGenerateDocumentation.java | 42 ++--
pulsar-client/pom.xml | 7 +
.../client/impl/conf/ClientConfigurationData.java | 1 +
.../client/impl/conf/CmdGenerateDocumentation.java | 51 +++++
.../impl/conf/ConsumerConfigurationData.java | 225 +++++++++++++++++++++
.../impl/conf/ProducerConfigurationData.java | 108 ++++++++++
.../client/impl/conf/ReaderConfigurationData.java | 78 +++++++
7 files changed, 500 insertions(+), 12 deletions(-)
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/BaseGenerateDocumentation.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/BaseGenerateDocumentation.java
index 376ba3bf517..0db45442460 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/BaseGenerateDocumentation.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/BaseGenerateDocumentation.java
@@ -97,6 +97,16 @@ public abstract class BaseGenerateDocumentation {
return fieldContext.deprecated();
};
+ protected Predicate<Field> isRequiredApiModel = field -> {
+ ApiModelProperty modelProperty = field.getAnnotation(ApiModelProperty.class);
+ return modelProperty.required();
+ };
+
+ protected Predicate<Field> isOptionalApiModel = field -> {
+ ApiModelProperty modelProperty = field.getAnnotation(ApiModelProperty.class);
+ return !modelProperty.required();
+ };
+
protected void writeDocListByFieldContext(List<Field> fieldList, StringBuilder sb, Object obj) throws Exception {
for (Field field : fieldList) {
FieldContext fieldContext = field.getAnnotation(FieldContext.class);
@@ -104,12 +114,26 @@ public abstract class BaseGenerateDocumentation {
sb.append("### ").append(field.getName()).append("\n");
sb.append(fieldContext.doc().replace(">", "\\>")).append("\n\n");
+ sb.append("**Type**: `").append(field.getType().getCanonicalName()).append("`\n\n");
sb.append("**Default**: `").append(field.get(obj)).append("`\n\n");
sb.append("**Dynamic**: `").append(fieldContext.dynamic()).append("`\n\n");
sb.append("**Category**: ").append(fieldContext.category()).append("\n\n");
}
}
+ protected void writeDocListByApiModel(List<Field> fieldList, StringBuilder sb, Object obj) throws Exception {
+ for (Field field : fieldList) {
+ ApiModelProperty modelProperty = field.getAnnotation(ApiModelProperty.class);
+ field.setAccessible(true);
+
+ String name = StringUtils.isBlank(modelProperty.name()) ? field.getName() : modelProperty.name();
+ sb.append("### ").append(name).append("\n");
+ sb.append(modelProperty.value().replace(">", "\\>")).append("\n\n");
+ sb.append("**Type**: `").append(field.getType().getCanonicalName()).append("`\n\n");
+ sb.append("**Default**: `").append(field.get(obj)).append("`\n\n");
+ }
+ }
+
protected static class CategoryComparator implements Comparator<Field> {
@Override
public int compare(Field o1, Field o2) {
@@ -161,23 +185,17 @@ public abstract class BaseGenerateDocumentation {
Field[] fields = clazz.getDeclaredFields();
ArrayList<Field> fieldList = new ArrayList<>(Arrays.asList(fields));
+ fieldList.removeIf(f -> f.getAnnotation(ApiModelProperty.class) == null);
fieldList.sort(Comparator.comparing(Field::getName));
+ List<Field> requiredFields = fieldList.stream().filter(isRequiredApiModel).toList();
+ List<Field> optionalFields = fieldList.stream().filter(isOptionalApiModel).toList();
sb.append("# ").append(type).append("\n");
sb.append(prefix).append(className).append(suffix);
+ sb.append("## Required\n");
+ writeDocListByApiModel(requiredFields, sb, obj);
sb.append("## Optional\n");
- for (Field field : fieldList) {
- ApiModelProperty fieldContext = field.getAnnotation(ApiModelProperty.class);
- if (fieldContext == null) {
- continue;
- }
- field.setAccessible(true);
-
- String name = StringUtils.isBlank(fieldContext.name()) ? field.getName() : fieldContext.name();
- sb.append("### ").append(name).append("\n");
- sb.append(fieldContext.value().replace(">", "\\>")).append("\n\n");
- sb.append("**Default**: `").append(field.get(obj)).append("`\n\n");
- }
+ writeDocListByApiModel(optionalFields, sb, obj);
return sb.toString();
}
diff --git a/pulsar-client/pom.xml b/pulsar-client/pom.xml
index 8cf75e89f52..c0b21c64e6d 100644
--- a/pulsar-client/pom.xml
+++ b/pulsar-client/pom.xml
@@ -45,6 +45,13 @@
<version>${project.parent.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.pulsar</groupId>
+ <artifactId>pulsar-broker-common</artifactId>
+ <version>${project.parent.version}</version>
+ <scope>compile</scope>
+ </dependency>
+
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>bouncy-castle-bc</artifactId>
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
index 53a16be6901..e6f25f4acf1 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
@@ -50,6 +50,7 @@ public class ClientConfigurationData implements Serializable, Cloneable {
@ApiModelProperty(
name = "serviceUrl",
+ required = true,
value = "Pulsar cluster HTTP URL to connect to a broker."
)
private String serviceUrl;
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/CmdGenerateDocumentation.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/CmdGenerateDocumentation.java
new file mode 100644
index 00000000000..14059c0db64
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/CmdGenerateDocumentation.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.conf;
+
+import com.beust.jcommander.Parameters;
+import lombok.Data;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.BaseGenerateDocumentation;
+
+@Data
+@Parameters(commandDescription = "Generate documentation automatically.")
+@Slf4j
+public class CmdGenerateDocumentation extends BaseGenerateDocumentation {
+
+ @Override
+ public String generateDocumentByClassName(String className) throws Exception {
+ StringBuilder sb = new StringBuilder();
+ if (ClientConfigurationData.class.getName().equals(className)) {
+ return generateDocByApiModelProperty(className, "Client", sb);
+ } else if (ProducerConfigurationData.class.getName().equals(className)) {
+ return generateDocByApiModelProperty(className, "Producer", sb);
+ } else if (ConsumerConfigurationData.class.getName().equals(className)) {
+ return generateDocByApiModelProperty(className, "Consumer", sb);
+ } else if (ReaderConfigurationData.class.getName().equals(className)) {
+ return generateDocByApiModelProperty(className, "Reader", sb);
+ }
+
+ return "Class [" + className + "] not found";
+ }
+
+ public static void main(String[] args) throws Exception {
+ CmdGenerateDocumentation generateDocumentation = new CmdGenerateDocumentation();
+ generateDocumentation.run(args);
+ }
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
index dcde042f4e8..db806c309e8 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.client.impl.conf;
import static com.google.common.base.Preconditions.checkArgument;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.google.common.collect.Sets;
+import io.swagger.annotations.ApiModelProperty;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
@@ -55,12 +56,34 @@ import org.apache.pulsar.client.api.SubscriptionType;
public class ConsumerConfigurationData<T> implements Serializable, Cloneable {
private static final long serialVersionUID = 1L;
+ @ApiModelProperty(
+ name = "topicNames",
+ required = true,
+ value = "Topic name"
+ )
private Set<String> topicNames = new TreeSet<>();
+ @ApiModelProperty(
+ name = "topicsPattern",
+ value = "Topic pattern"
+ )
private Pattern topicsPattern;
+ @ApiModelProperty(
+ name = "subscriptionName",
+ value = "Subscription name"
+ )
private String subscriptionName;
+ @ApiModelProperty(
+ name = "subscriptionType",
+ value = "Subscription type.\n"
+ + "Four subscription types are available:\n"
+ + "* Exclusive\n"
+ + "* Failover\n"
+ + "* Shared\n"
+ + "* Key_Shared"
+ )
private SubscriptionType subscriptionType = SubscriptionType.Exclusive;
private Map<String, String> subscriptionProperties;
@@ -73,26 +96,111 @@ public class ConsumerConfigurationData<T> implements Serializable, Cloneable {
@JsonIgnore
private ConsumerEventListener consumerEventListener;
+ @ApiModelProperty(
+ name = "negativeAckRedeliveryBackoff",
+ value = "Interface for custom message is negativeAcked policy. You can specify `RedeliveryBackoff` for a"
+ + "consumer."
+ )
@JsonIgnore
private RedeliveryBackoff negativeAckRedeliveryBackoff;
+ @ApiModelProperty(
+ name = "ackTimeoutRedeliveryBackoff",
+ value = "Interface for custom message is ackTimeout policy. You can specify `RedeliveryBackoff` for a"
+ + " consumer."
+ )
@JsonIgnore
private RedeliveryBackoff ackTimeoutRedeliveryBackoff;
+ @ApiModelProperty(
+ name = "receiverQueueSize",
+ value = "Size of a consumer's receiver queue.\n"
+ + "\n"
+ + "For example, the number of messages accumulated by a consumer before an application calls "
+ + "`Receive`.\n"
+ + "\n"
+ + "A value higher than the default value increases consumer throughput, though at the expense of "
+ + "more memory utilization."
+ )
private int receiverQueueSize = 1000;
+ @ApiModelProperty(
+ name = "acknowledgementsGroupTimeMicros",
+ value = "Group a consumer acknowledgment for a specified time.\n"
+ + "\n"
+ + "By default, a consumer uses 100ms grouping time to send out acknowledgments to a broker.\n"
+ + "\n"
+ + "Setting a group time of 0 sends out acknowledgments immediately.\n"
+ + "\n"
+ + "A longer ack group time is more efficient at the expense of a slight increase in message "
+ + "re-deliveries after a failure."
+ )
private long acknowledgementsGroupTimeMicros = TimeUnit.MILLISECONDS.toMicros(100);
+ @ApiModelProperty(
+ name = "negativeAckRedeliveryDelayMicros",
+ value = "Delay to wait before redelivering messages that failed to be processed.\n"
+ + "\n"
+ + "When an application uses {@link Consumer#negativeAcknowledge(Message)}, failed messages are "
+ + "redelivered after a fixed timeout."
+ )
private long negativeAckRedeliveryDelayMicros = TimeUnit.MINUTES.toMicros(1);
+ @ApiModelProperty(
+ name = "maxTotalReceiverQueueSizeAcrossPartitions",
+ value = "The max total receiver queue size across partitions.\n"
+ + "\n"
+ + "This setting reduces the receiver queue size for individual partitions if the total receiver "
+ + "queue size exceeds this value."
+ )
private int maxTotalReceiverQueueSizeAcrossPartitions = 50000;
+ @ApiModelProperty(
+ name = "consumerName",
+ value = "Consumer name"
+ )
private String consumerName = null;
+ @ApiModelProperty(
+ name = "ackTimeoutMillis",
+ value = "Timeout of unacked messages"
+ )
private long ackTimeoutMillis = 0;
+ @ApiModelProperty(
+ name = "tickDurationMillis",
+ value = "Granularity of the ack-timeout redelivery.\n"
+ + "\n"
+ + "Using an higher `tickDurationMillis` reduces the memory overhead to track messages when setting "
+ + "ack-timeout to a bigger value (for example, 1 hour)."
+ )
private long tickDurationMillis = 1000;
+ @ApiModelProperty(
+ name = "priorityLevel",
+ value = "Priority level for a consumer to which a broker gives more priority while dispatching messages "
+ + "in Shared subscription type.\n"
+ + "\n"
+ + "The broker follows descending priorities. For example, 0=max-priority, 1, 2,...\n"
+ + "\n"
+ + "In Shared subscription type, the broker **first dispatches messages to the max priority level "
+ + "consumers if they have permits**. Otherwise, the broker considers next priority level consumers."
+ + "\n\n"
+ + "**Example 1**\n"
+ + "If a subscription has consumerA with `priorityLevel` 0 and consumerB with `priorityLevel` 1,"
+ + " then the broker **only dispatches messages to consumerA until it runs out permits** and then"
+ + " starts dispatching messages to consumerB.\n"
+ + "\n"
+ + "**Example 2**\n"
+ + "Consumer Priority, Level, Permits\n"
+ + "C1, 0, 2\n"
+ + "C2, 0, 1\n"
+ + "C3, 0, 1\n"
+ + "C4, 1, 2\n"
+ + "C5, 1, 1\n"
+ + "\n"
+ + "Order in which a broker dispatches messages to consumers is: C1, C2, C3, C1, C4, C5, C4."
+ )
private int priorityLevel = 0;
/**
@@ -111,11 +219,27 @@ public class ConsumerConfigurationData<T> implements Serializable, Cloneable {
return maxPendingChunkedMessage;
}
+ @ApiModelProperty(
+ name = "maxPendingChunkedMessage",
+ value = "The maximum size of a queue holding pending chunked messages. When the threshold is reached,"
+ + " the consumer drops pending messages to optimize memory utilization."
+ )
// max pending chunked message to avoid sending incomplete message into the queue and memory
private int maxPendingChunkedMessage = 10;
+ @ApiModelProperty(
+ name = "autoAckOldestChunkedMessageOnQueueFull",
+ value = "Whether to automatically acknowledge pending chunked messages when the threashold of"
+ + " `maxPendingChunkedMessage` is reached. If set to `false`, these messages will be redelivered"
+ + " by their broker."
+ )
private boolean autoAckOldestChunkedMessageOnQueueFull = false;
+ @ApiModelProperty(
+ name = "expireTimeOfIncompleteChunkedMessageMillis",
+ value = "The time interval to expire incomplete chunks if a consumer fails to receive all the chunks in the"
+ + " specified time period. The default value is 1 minute."
+ )
private long expireTimeOfIncompleteChunkedMessageMillis = TimeUnit.MINUTES.toMillis(1);
@JsonIgnore
@@ -124,18 +248,107 @@ public class ConsumerConfigurationData<T> implements Serializable, Cloneable {
@JsonIgnore
private transient MessageCrypto messageCrypto = null;
+ @ApiModelProperty(
+ name = "cryptoFailureAction",
+ value = "Consumer should take action when it receives a message that can not be decrypted.\n"
+ + "* **FAIL**: this is the default option to fail messages until crypto succeeds.\n"
+ + "* **DISCARD**:silently acknowledge and not deliver message to an application.\n"
+ + "* **CONSUME**: deliver encrypted messages to applications. It is the application's"
+ + " responsibility to decrypt the message.\n"
+ + "\n"
+ + "The decompression of message fails.\n"
+ + "\n"
+ + "If messages contain batch messages, a client is not be able to retrieve individual messages in"
+ + " batch.\n"
+ + "\n"
+ + "Delivered encrypted message contains {@link EncryptionContext} which contains encryption and "
+ + "compression information in it using which application can decrypt consumed message payload."
+ )
private ConsumerCryptoFailureAction cryptoFailureAction = ConsumerCryptoFailureAction.FAIL;
+ @ApiModelProperty(
+ name = "properties",
+ value = "A name or value property of this consumer.\n"
+ + "\n"
+ + "`properties` is application defined metadata attached to a consumer.\n"
+ + "\n"
+ + "When getting a topic stats, associate this metadata with the consumer stats for easier "
+ + "identification."
+ )
private SortedMap<String, String> properties = new TreeMap<>();
+ @ApiModelProperty(
+ name = "readCompacted",
+ value = "If enabling `readCompacted`, a consumer reads messages from a compacted topic rather than reading "
+ + "a full message backlog of a topic.\n"
+ + "\n"
+ + "A consumer only sees the latest value for each key in the compacted topic, up until reaching "
+ + "the point in the topic message when compacting backlog. Beyond that point, send messages as "
+ + "normal.\n"
+ + "\n"
+ + "Only enabling `readCompacted` on subscriptions to persistent topics, which have a single active "
+ + "consumer (like failure or exclusive subscriptions).\n"
+ + "\n"
+ + "Attempting to enable it on subscriptions to non-persistent topics or on shared subscriptions "
+ + "leads to a subscription call throwing a `PulsarClientException`."
+ )
private boolean readCompacted = false;
+ @ApiModelProperty(
+ name = "subscriptionInitialPosition",
+ value = "Initial position at which to set cursor when subscribing to a topic at first time."
+ )
private SubscriptionInitialPosition subscriptionInitialPosition = SubscriptionInitialPosition.Latest;
+ @ApiModelProperty(
+ name = "patternAutoDiscoveryPeriod",
+ value = "Topic auto discovery period when using a pattern for topic's consumer.\n"
+ + "\n"
+ + "The default and minimum value is 1 minute."
+ )
private int patternAutoDiscoveryPeriod = 60;
+ @ApiModelProperty(
+ name = "regexSubscriptionMode",
+ value = "When subscribing to a topic using a regular expression, you can pick a certain type of topics.\n"
+ + "\n"
+ + "* **PersistentOnly**: only subscribe to persistent topics.\n"
+ + "* **NonPersistentOnly**: only subscribe to non-persistent topics.\n"
+ + "* **AllTopics**: subscribe to both persistent and non-persistent topics."
+ )
private RegexSubscriptionMode regexSubscriptionMode = RegexSubscriptionMode.PersistentOnly;
+ @ApiModelProperty(
+ name = "deadLetterPolicy",
+ value = "Dead letter policy for consumers.\n"
+ + "\n"
+ + "By default, some messages are probably redelivered many times, even to the extent that it "
+ + "never stops.\n"
+ + "\n"
+ + "By using the dead letter mechanism, messages have the max redelivery count. **When exceeding the"
+ + " maximum number of redeliveries, messages are sent to the Dead Letter Topic and acknowledged "
+ + "automatically**.\n"
+ + "\n"
+ + "You can enable the dead letter mechanism by setting `deadLetterPolicy`.\n"
+ + "\n"
+ + "**Example**\n"
+ + "```java\n"
+ + "client.newConsumer()\n"
+ + ".deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(10).build())\n"
+ + ".subscribe();\n"
+ + "```\n"
+ + "Default dead letter topic name is `{TopicName}-{Subscription}-DLQ`.\n"
+ + "\n"
+ + "To set a custom dead letter topic name:\n"
+ + "```java\n"
+ + "client.newConsumer()\n"
+ + ".deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(10)\n"
+ + ".deadLetterTopic(\"your-topic-name\").build())\n"
+ + ".subscribe();\n"
+ + "```\n"
+ + "When specifying the dead letter policy while not specifying `ackTimeoutMillis`, you can set the"
+ + " ack timeout to 30000 millisecond."
+ )
private transient DeadLetterPolicy deadLetterPolicy;
private boolean retryEnable = false;
@@ -143,10 +356,22 @@ public class ConsumerConfigurationData<T> implements Serializable, Cloneable {
@JsonIgnore
private BatchReceivePolicy batchReceivePolicy;
+ @ApiModelProperty(
+ name = "autoUpdatePartitions",
+ value = "If `autoUpdatePartitions` is enabled, a consumer subscribes to partition increasement "
+ + "automatically.\n"
+ + "\n"
+ + "**Note**: this is only for partitioned consumers."
+ )
private boolean autoUpdatePartitions = true;
private long autoUpdatePartitionsIntervalSeconds = 60;
+ @ApiModelProperty(
+ name = "replicateSubscriptionState",
+ value = "If `replicateSubscriptionState` is enabled, a subscription state is replicated to geo-replicated"
+ + " clusters."
+ )
private boolean replicateSubscriptionState = false;
private boolean resetIncludeHead = false;
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ProducerConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ProducerConfigurationData.java
index c5d7b37d4ee..ec1de3d473f 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ProducerConfigurationData.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ProducerConfigurationData.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.client.impl.conf;
import static com.google.common.base.Preconditions.checkArgument;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.google.common.collect.Sets;
+import io.swagger.annotations.ApiModelProperty;
import java.io.Serializable;
import java.util.Set;
import java.util.SortedMap;
@@ -52,27 +53,120 @@ public class ProducerConfigurationData implements Serializable, Cloneable {
public static final int DEFAULT_MAX_PENDING_MESSAGES = 0;
public static final int DEFAULT_MAX_PENDING_MESSAGES_ACROSS_PARTITIONS = 0;
+ @ApiModelProperty(
+ name = "topicName",
+ required = true,
+ value = "Topic name"
+ )
private String topicName = null;
+
+ @ApiModelProperty(
+ name = "producerName",
+ value = "Producer name"
+ )
private String producerName = null;
+
+ @ApiModelProperty(
+ name = "sendTimeoutMs",
+ value = "Message send timeout in ms.\n"
+ + "If a message is not acknowledged by a server before the `sendTimeout` expires, an error occurs."
+ )
private long sendTimeoutMs = 30000;
+
+ @ApiModelProperty(
+ name = "blockIfQueueFull",
+ value = "If it is set to `true`, when the outgoing message queue is full, the `Send` and `SendAsync`"
+ + " methods of producer block, rather than failing and throwing errors.\n"
+ + "If it is set to `false`, when the outgoing message queue is full, the `Send` and `SendAsync`"
+ + " methods of producer fail and `ProducerQueueIsFullError` exceptions occur.\n"
+ + "\n"
+ + "The `MaxPendingMessages` parameter determines the size of the outgoing message queue."
+ )
private boolean blockIfQueueFull = false;
+
+ @ApiModelProperty(
+ name = "maxPendingMessages",
+ value = "The maximum size of a queue holding pending messages.\n"
+ + "\n"
+ + "For example, a message waiting to receive an acknowledgment from a [broker]"
+ + "(https://pulsar.apache.org/docs/reference-terminology#broker).\n"
+ + "\n"
+ + "By default, when the queue is full, all calls to the `Send` and `SendAsync` methods fail"
+ + " **unless** you set `BlockIfQueueFull` to `true`."
+ )
private int maxPendingMessages = DEFAULT_MAX_PENDING_MESSAGES;
+
+ @ApiModelProperty(
+ name = "maxPendingMessagesAcrossPartitions",
+ value = "The maximum number of pending messages across partitions.\n"
+ + "\n"
+ + "Use the setting to lower the max pending messages for each partition ({@link "
+ + "#setMaxPendingMessages(int)}) if the total number exceeds the configured value."
+ )
private int maxPendingMessagesAcrossPartitions = DEFAULT_MAX_PENDING_MESSAGES_ACROSS_PARTITIONS;
+
+ @ApiModelProperty(
+ name = "messageRoutingMode",
+ value = "Message routing logic for producers on [partitioned topics]"
+ + "(https://pulsar.apache.org/docs/concepts-architecture-overview#partitioned-topics).\n"
+ + "Apply the logic only when setting no key on messages.\n"
+ + "Available options are as follows:\n"
+ + "* `pulsar.RoundRobinDistribution`: round robin\n"
+ + "* `pulsar.UseSinglePartition`: publish all messages to a single partition\n"
+ + "* `pulsar.CustomPartition`: a custom partitioning scheme"
+ )
private MessageRoutingMode messageRoutingMode = null;
+
+ @ApiModelProperty(
+ name = "hashingScheme",
+ value = "Hashing function determining the partition where you publish a particular message (partitioned "
+ + "topics only).\n"
+ + "Available options are as follows:\n"
+ + "* `pulsar.JavastringHash`: the equivalent of `string.hashCode()` in Java\n"
+ + "* `pulsar.Murmur3_32Hash`: applies the [Murmur3](https://en.wikipedia.org/wiki/MurmurHash)"
+ + " hashing function\n"
+ + "* `pulsar.BoostHash`: applies the hashing function from C++'s"
+ + "[Boost](https://www.boost.org/doc/libs/1_62_0/doc/html/hash.html) library"
+ )
private HashingScheme hashingScheme = HashingScheme.JavaStringHash;
+ @ApiModelProperty(
+ name = "cryptoFailureAction",
+ value = "Producer should take action when encryption fails.\n"
+ + "* **FAIL**: if encryption fails, unencrypted messages fail to send.\n"
+ + "* **SEND**: if encryption fails, unencrypted messages are sent."
+ )
private ProducerCryptoFailureAction cryptoFailureAction = ProducerCryptoFailureAction.FAIL;
@JsonIgnore
private MessageRouter customMessageRouter = null;
+ @ApiModelProperty(
+ name = "batchingMaxPublishDelayMicros",
+ value = "Batching time period of sending messages."
+ )
private long batchingMaxPublishDelayMicros = TimeUnit.MILLISECONDS.toMicros(1);
private int batchingPartitionSwitchFrequencyByPublishDelay = 10;
+
+ @ApiModelProperty(
+ name = "batchingMaxMessages",
+ value = "The maximum number of messages permitted in a batch."
+ )
private int batchingMaxMessages = DEFAULT_BATCHING_MAX_MESSAGES;
private int batchingMaxBytes = 128 * 1024; // 128KB (keep the maximum consistent as previous versions)
+
+ @ApiModelProperty(
+ name = "batchingEnabled",
+ value = "Enable batching of messages."
+ )
private boolean batchingEnabled = true; // enabled by default
@JsonIgnore
private BatcherBuilder batcherBuilder = BatcherBuilder.DEFAULT;
+
+ @ApiModelProperty(
+ name = "chunkingEnabled",
+ value = "Enable chunking of messages."
+ )
private boolean chunkingEnabled = false;
private int chunkMaxMessageSize = -1;
@@ -85,6 +179,15 @@ public class ProducerConfigurationData implements Serializable, Cloneable {
@JsonIgnore
private Set<String> encryptionKeys = new TreeSet<>();
+ @ApiModelProperty(
+ name = "compressionType",
+ value = "Message data compression type used by a producer.\n"
+ + "Available options:\n"
+ + "* [LZ4](https://github.com/lz4/lz4)\n"
+ + "* [ZLIB](https://zlib.net/)\n"
+ + "* [ZSTD](https://facebook.github.io/zstd/)\n"
+ + "* [SNAPPY](https://google.github.io/snappy/)"
+ )
private CompressionType compressionType = CompressionType.NONE;
// Cannot use Optional<Long> since it's not serializable
@@ -102,6 +205,11 @@ public class ProducerConfigurationData implements Serializable, Cloneable {
private SortedMap<String, String> properties = new TreeMap<>();
+ @ApiModelProperty(
+ name = "initialSubscriptionName",
+ value = "Use this configuration to automatically create an initial subscription when creating a topic."
+ + " If this field is not set, the initial subscription is not created."
+ )
private String initialSubscriptionName = null;
/**
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ReaderConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ReaderConfigurationData.java
index c33cbf186cb..689759cb995 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ReaderConfigurationData.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ReaderConfigurationData.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.client.impl.conf;
import com.fasterxml.jackson.annotation.JsonIgnore;
+import io.swagger.annotations.ApiModelProperty;
import java.io.Serializable;
import java.util.HashSet;
import java.util.List;
@@ -37,6 +38,11 @@ public class ReaderConfigurationData<T> implements Serializable, Cloneable {
private static final long serialVersionUID = 1L;
+ @ApiModelProperty(
+ name = "topicNames",
+ required = true,
+ value = "Topic name"
+ )
private Set<String> topicNames = new HashSet<>();
@JsonIgnore
@@ -45,18 +51,90 @@ public class ReaderConfigurationData<T> implements Serializable, Cloneable {
@JsonIgnore
private long startMessageFromRollbackDurationInSec;
+ @ApiModelProperty(
+ name = "receiverQueueSize",
+ value = "Size of a consumer's receiver queue.\n"
+ + "\n"
+ + "For example, the number of messages that can be accumulated by a consumer before an "
+ + "application calls `Receive`.\n"
+ + "\n"
+ + "A value higher than the default value increases consumer throughput, though at the expense of "
+ + "more memory utilization."
+ )
private int receiverQueueSize = 1000;
+ @ApiModelProperty(
+ name = "readerListener",
+ value = "A listener that is called for message received."
+ )
private ReaderListener<T> readerListener;
+ @ApiModelProperty(
+ name = "readerName",
+ value = "Reader name"
+ )
private String readerName = null;
+
+ @ApiModelProperty(
+ name = "subscriptionRolePrefix",
+ value = "Prefix of subscription role."
+ )
private String subscriptionRolePrefix = null;
+
+ @ApiModelProperty(
+ name = "subscriptionName",
+ value = "Subscription name"
+ )
private String subscriptionName = null;
+ @ApiModelProperty(
+ name = "cryptoKeyReader",
+ value = "Interface that abstracts the access to a key store."
+ )
private CryptoKeyReader cryptoKeyReader = null;
+
+ @ApiModelProperty(
+ name = "cryptoFailureAction",
+ value = "Consumer should take action when it receives a message that can not be decrypted.\n"
+ + "* **FAIL**: this is the default option to fail messages until crypto succeeds.\n"
+ + "* **DISCARD**: silently acknowledge and not deliver message to an application.\n"
+ + "* **CONSUME**: deliver encrypted messages to applications. It is the application's"
+ + " responsibility to decrypt the message.\n"
+ + "\n"
+ + "The message decompression fails.\n"
+ + "\n"
+ + "If messages contain batch messages, a client is not be able to retrieve individual messages in"
+ + " batch.\n"
+ + "\n"
+ + "Delivered encrypted message contains {@link EncryptionContext} which contains encryption and "
+ + "compression information in it using which application can decrypt consumed message payload."
+ )
private ConsumerCryptoFailureAction cryptoFailureAction = ConsumerCryptoFailureAction.FAIL;
+ @ApiModelProperty(
+ name = "readCompacted",
+ value = "If enabling `readCompacted`, a consumer reads messages from a compacted topic rather than a full "
+ + "message backlog of a topic.\n"
+ + "\n"
+ + "A consumer only sees the latest value for each key in the compacted topic, up until reaching "
+ + "the point in the topic message when compacting backlog. Beyond that point, send messages as "
+ + "normal.\n"
+ + "\n"
+ + "`readCompacted` can only be enabled on subscriptions to persistent topics, which have a single "
+ + "active consumer (for example, failure or exclusive subscriptions).\n"
+ + "\n"
+ + "Attempting to enable it on subscriptions to non-persistent topics or on shared subscriptions "
+ + "leads to a subscription call throwing a `PulsarClientException`."
+ )
private boolean readCompacted = false;
+
+ @ApiModelProperty(
+ name = "resetIncludeHead",
+ value = "If set to true, the first message to be returned is the one specified by `messageId`.\n"
+ + "\n"
+ + "If set to false, the first message to be returned is the one next to the message specified by "
+ + "`messageId`."
+ )
private boolean resetIncludeHead = false;
private transient List<Range> keyHashRanges;