You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2019/08/14 19:40:41 UTC
[camel] 01/02: Increase configuration options for Pulsar
This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch camel-2.x
in repository https://gitbox.apache.org/repos/asf/camel.git
commit c56005d487c0a55e0e375de7c556f82504f5527e
Author: Sherman Richard <sh...@thehutgroup.com>
AuthorDate: Tue Aug 13 11:10:40 2019 +0100
Increase configuration options for Pulsar
---
.../camel/component/pulsar/PulsarProducer.java | 20 ++-
.../pulsar/configuration/PulsarConfiguration.java | 147 ++++++++++++++++++++-
2 files changed, 162 insertions(+), 5 deletions(-)
diff --git a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarProducer.java b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarProducer.java
index 06ac5e7..c425400 100644
--- a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarProducer.java
+++ b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarProducer.java
@@ -16,17 +16,18 @@
*/
package org.apache.camel.component.pulsar;
-import java.util.Map;
-
import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.NoTypeConversionAvailableException;
import org.apache.camel.TypeConversionException;
+import org.apache.camel.component.pulsar.configuration.PulsarConfiguration;
import org.apache.camel.component.pulsar.utils.message.PulsarMessageUtils;
import org.apache.camel.impl.DefaultProducer;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
+import java.util.concurrent.TimeUnit;
+
public class PulsarProducer extends DefaultProducer {
private final PulsarEndpoint pulsarEndpoint;
@@ -55,7 +56,8 @@ public class PulsarProducer extends DefaultProducer {
private synchronized void createProducer() throws org.apache.pulsar.client.api.PulsarClientException {
if (producer == null) {
final String topicUri = pulsarEndpoint.getTopicUri();
- String producerName = pulsarEndpoint.getPulsarConfiguration().getProducerName();
+ PulsarConfiguration configuration = pulsarEndpoint.getPulsarConfiguration();
+ String producerName = configuration.getProducerName();
if (producerName == null) {
producerName = topicUri + "-" + Thread.currentThread().getId();
}
@@ -63,7 +65,17 @@ public class PulsarProducer extends DefaultProducer {
.getPulsarClient()
.newProducer()
.producerName(producerName)
- .topic(topicUri);
+ .topic(topicUri)
+ .sendTimeout(configuration.getSendTimeoutMs(), TimeUnit.MILLISECONDS)
+ .blockIfQueueFull(configuration.isBlockIfQueueFull())
+ .maxPendingMessages(configuration.getMaxPendingMessages())
+ .maxPendingMessagesAcrossPartitions(configuration.getMaxPendingMessagesAcrossPartitions())
+ .batchingMaxPublishDelay(configuration.getBatchingMaxPublishDelayMicros(), TimeUnit.MICROSECONDS)
+ .batchingMaxMessages(configuration.getMaxPendingMessages())
+ .enableBatching(configuration.isBatchingEnabled())
+ .initialSequenceId(configuration.getInitialSequenceId())
+ .compressionType(configuration.getCompressionType());
+
producer = producerBuilder.create();
}
}
diff --git a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/configuration/PulsarConfiguration.java b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/configuration/PulsarConfiguration.java
index 3c174bb..95441d6 100644
--- a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/configuration/PulsarConfiguration.java
+++ b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/configuration/PulsarConfiguration.java
@@ -19,13 +19,16 @@ package org.apache.camel.component.pulsar.configuration;
import org.apache.camel.component.pulsar.utils.consumers.SubscriptionType;
import org.apache.camel.spi.UriParam;
import org.apache.camel.spi.UriParams;
+import org.apache.pulsar.client.api.CompressionType;
+
+import java.util.concurrent.TimeUnit;
import static org.apache.camel.component.pulsar.utils.consumers.SubscriptionType.EXCLUSIVE;
@UriParams
public class PulsarConfiguration {
- @UriParam(label = "consumer", defaultValue = "subscription")
+ @UriParam(label = "consumer", defaultValue = "subs")
private String subscriptionName = "subs";
@UriParam(label = "consumer", enums = "EXCLUSIVE, SHARED, FAILOVER", defaultValue = "EXCLUSIVE")
private SubscriptionType subscriptionType = EXCLUSIVE;
@@ -39,6 +42,25 @@ public class PulsarConfiguration {
private String producerName = "default-producer";
@UriParam(label = "consumer", defaultValue = "cons")
private String consumerNamePrefix = "cons";
+ @UriParam(label = "producer", description = "Send timeout in milliseconds", defaultValue = "30000")
+ private int sendTimeoutMs = 30000;
+ @UriParam(label = "producer", description = "Whether to block the producing thread if pending messages queue is full or to throw a ProducerQueueIsFullError", defaultValue = "false")
+ private boolean blockIfQueueFull = false;
+ @UriParam(label = "producer", description = "Size of the pending massages queue. When the queue is full, by default, any further sends will fail unless blockIfQueueFull=true", defaultValue = "1000")
+ private int maxPendingMessages = 1000;
+ @UriParam(label = "producer", description = "The maximum number of pending messages for partitioned topics. The maxPendingMessages value will be reduced if (number of partitions * maxPendingMessages) exceeds this value. Partitioned topics have a pending message queue for each partition.", defaultValue = "50000")
+ private int maxPendingMessagesAcrossPartitions = 50000;
+ @UriParam(label = "producer", description = "The maximum time period within which the messages sent will be batched if batchingEnabled is true.", defaultValue = "1000")
+ private long batchingMaxPublishDelayMicros = TimeUnit.MILLISECONDS.toMicros(1);
+ @UriParam(label = "producer", description = "The maximum size to batch messages.", defaultValue = "1000")
+ private int batchingMaxMessages = 1000;
+ @UriParam(label = "producer", description = "Control whether automatic batching of messages is enabled for the producer.", defaultValue = "true")
+ private boolean batchingEnabled = true;
+ @UriParam(label = "producer", description = "The first message published will have a sequence Id of initialSequenceId + 1.", defaultValue = "-1")
+ private long initialSequenceId = -1;
+ @UriParam(label = "producer", description = "Compression type to use, defaults to NONE from [NONE, LZ4, ZLIB]", defaultValue = "NONE")
+ private CompressionType compressionType = CompressionType.NONE;
+
public String getSubscriptionName() {
return subscriptionName;
@@ -116,4 +138,127 @@ public class PulsarConfiguration {
public void setConsumerNamePrefix(String consumerNamePrefix) {
this.consumerNamePrefix = consumerNamePrefix;
}
+
+ /**
+ * Send timeout in milliseconds.
+ * Defaults to 30,000ms (30 seconds)
+ */
+ public void setSendTimeoutMs(int sendTimeoutMs) {
+ this.sendTimeoutMs = sendTimeoutMs;
+ }
+
+ public int getSendTimeoutMs() {
+ return sendTimeoutMs;
+ }
+
+ /**
+ * Set whether the send and asyncSend operations should block when the outgoing message queue is full.
+ * If set to false, send operations will immediately fail with ProducerQueueIsFullError when there is no space left
+ * in the pending queue.
+ * Default is false.
+ */
+ public void setBlockIfQueueFull(boolean blockIfQueueFull) {
+ this.blockIfQueueFull = blockIfQueueFull;
+ }
+
+ public boolean isBlockIfQueueFull() {
+ return blockIfQueueFull;
+ }
+
+ /**
+ * Set the max size of the queue holding the messages pending to receive an acknowledgment from the broker.
+ * Default is 1000.
+ */
+ public void setMaxPendingMessages(int maxPendingMessages) {
+ this.maxPendingMessages = maxPendingMessages;
+ }
+
+ public int getMaxPendingMessages() {
+ return maxPendingMessages;
+ }
+
+ /**
+ * Set the number of max pending messages across all the partitions.
+ * Default is 50000.
+ */
+ public void setMaxPendingMessagesAcrossPartitions(int maxPendingMessagesAcrossPartitions) {
+ this.maxPendingMessagesAcrossPartitions = maxPendingMessagesAcrossPartitions;
+ }
+
+ public int getMaxPendingMessagesAcrossPartitions() {
+ return maxPendingMessagesAcrossPartitions;
+ }
+
+ /**
+ * Set the time period within which the messages sent will be batched if batch messages are
+ * enabled. If set to a non zero value, messages will be queued until either:
+ * <ul>
+ * <li>this time interval expires</li>
+ * <li>the max number of messages in a batch is reached
+ * </ul>
+ * Default is 1ms.
+ */
+ public void setBatchingMaxPublishDelayMicros(long batchingMaxPublishDelayMicros) {
+ this.batchingMaxPublishDelayMicros = batchingMaxPublishDelayMicros;
+ }
+
+ public long getBatchingMaxPublishDelayMicros() {
+ return batchingMaxPublishDelayMicros;
+ }
+
+ /**
+ * Set the maximum number of messages permitted in a batch.
+ * Default 1,000.
+ */
+ public void setBatchingMaxMessages(int batchingMaxMessages) {
+ this.batchingMaxMessages = batchingMaxMessages;
+ }
+
+ public int getBatchingMaxMessages() {
+ return batchingMaxMessages;
+ }
+
+ /**
+ * Control whether automatic batching of messages is enabled for the producer.
+ * Default is true.
+ */
+ public void setBatchingEnabled(boolean batchingEnabled) {
+ this.batchingEnabled = batchingEnabled;
+ }
+
+ public boolean isBatchingEnabled() {
+ return batchingEnabled;
+ }
+
+ /**
+ * Set the baseline for the sequence ids for messages published by the producer.
+ * First message will be using (initialSequenceId + 1) as its sequence id and subsequent messages will be assigned
+ * incremental sequence ids, if not otherwise specified.
+ */
+ public void setInitialSequenceId(long initialSequenceId) {
+ this.initialSequenceId = initialSequenceId;
+ }
+
+ public long getInitialSequenceId() {
+ return initialSequenceId;
+ }
+
+ /**
+ *
+ * Set the compression type for the producer.
+ * Supported compression types are:
+ * <ul>
+ * <li>NONE: No compression</li>
+ * <li>LZ4: Compress with LZ4 algorithm. Faster but lower compression than ZLib</li>
+ * <li>ZLI: Standard ZLib compression</li>
+ * </ul>
+ * Default is NONE
+ */
+ public void setCompressionType(String compressionType) {
+ this.compressionType = CompressionType.valueOf(compressionType.toUpperCase());
+ }
+
+ public CompressionType getCompressionType() {
+ return compressionType;
+ }
}