You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2019/08/14 13:20:32 UTC

[camel] 01/04: Increase configuration options for Pulsar

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 722298e85027de6a0e64d6d731653bc0f32b3f9c
Author: Sherman Richard <sh...@thehutgroup.com>
AuthorDate: Tue Aug 13 13:33:10 2019 +0100

    Increase configuration options for Pulsar
---
 .../camel/component/pulsar/PulsarProducer.java     |  17 ++-
 .../pulsar/configuration/PulsarConfiguration.java  | 144 +++++++++++++++++++++
 2 files changed, 159 insertions(+), 2 deletions(-)

diff --git a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarProducer.java b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarProducer.java
index 70f1d40..140f0c7 100644
--- a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarProducer.java
+++ b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarProducer.java
@@ -20,11 +20,14 @@ import org.apache.camel.Exchange;
 import org.apache.camel.Message;
 import org.apache.camel.NoTypeConversionAvailableException;
 import org.apache.camel.TypeConversionException;
+import org.apache.camel.component.pulsar.configuration.PulsarConfiguration;
 import org.apache.camel.component.pulsar.utils.message.PulsarMessageUtils;
 import org.apache.camel.support.DefaultProducer;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.ProducerBuilder;
 
+import java.util.concurrent.TimeUnit;
+
 public class PulsarProducer extends DefaultProducer {
 
     private final PulsarEndpoint pulsarEndpoint;
@@ -52,7 +55,8 @@ public class PulsarProducer extends DefaultProducer {
     private synchronized void createProducer() throws org.apache.pulsar.client.api.PulsarClientException {
         if (producer == null) {
             final String topicUri = pulsarEndpoint.getUri();
-            String producerName = pulsarEndpoint.getPulsarConfiguration().getProducerName();
+            PulsarConfiguration configuration = pulsarEndpoint.getPulsarConfiguration();
+            String producerName = configuration.getProducerName();
             if (producerName == null) {
                 producerName = topicUri + "-" + Thread.currentThread().getId();
             }
@@ -60,7 +64,16 @@ public class PulsarProducer extends DefaultProducer {
                     .getPulsarClient()
                     .newProducer()
                     .producerName(producerName)
-                    .topic(topicUri);
+                    .topic(topicUri)
+                    .sendTimeout(configuration.getSendTimeoutMs(), TimeUnit.MILLISECONDS)
+                    .blockIfQueueFull(configuration.isBlockIfQueueFull())
+                    .maxPendingMessages(configuration.getMaxPendingMessages())
+                    .maxPendingMessagesAcrossPartitions(configuration.getMaxPendingMessagesAcrossPartitions())
+                    .batchingMaxPublishDelay(configuration.getBatchingMaxPublishDelayMicros(), TimeUnit.MICROSECONDS)
+                    .batchingMaxMessages(configuration.getMaxPendingMessages())
+                    .enableBatching(configuration.isBatchingEnabled())
+                    .initialSequenceId(configuration.getInitialSequenceId())
+                    .compressionType(configuration.getCompressionType());
             producer = producerBuilder.create();
         }
     }
diff --git a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/configuration/PulsarConfiguration.java b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/configuration/PulsarConfiguration.java
index 6b160fd..b2099d3 100644
--- a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/configuration/PulsarConfiguration.java
+++ b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/configuration/PulsarConfiguration.java
@@ -19,6 +19,9 @@ package org.apache.camel.component.pulsar.configuration;
 import org.apache.camel.component.pulsar.utils.consumers.SubscriptionType;
 import org.apache.camel.spi.UriParam;
 import org.apache.camel.spi.UriParams;
+import org.apache.pulsar.client.api.CompressionType;
+
+import java.util.concurrent.TimeUnit;
 
 import static org.apache.camel.component.pulsar.utils.consumers.SubscriptionType.EXCLUSIVE;
 
@@ -45,6 +48,24 @@ public class PulsarConfiguration {
     private long ackTimeoutMillis = 10000;
     @UriParam(label = "consumer", defaultValue = "100")
     private long ackGroupTimeMillis = 100;
+    @UriParam(label = "producer", description = "Send timeout in milliseconds", defaultValue = "30000")
+    private int sendTimeoutMs = 30000;
+    @UriParam(label = "producer", description = "Whether to block the producing thread if pending messages queue is full or to throw a ProducerQueueIsFullError", defaultValue = "false")
+    private boolean blockIfQueueFull = false;
+    @UriParam(label = "producer", description = "Size of the pending massages queue. When the queue is full, by default, any further sends will fail unless blockIfQueueFull=true", defaultValue = "1000")
+    private int maxPendingMessages = 1000;
+    @UriParam(label = "producer", description = "The maximum number of pending messages for partitioned topics. The maxPendingMessages value will be reduced if (number of partitions * maxPendingMessages) exceeds this value. Partitioned topics have a pending message queue for each partition.", defaultValue = "50000")
+    private int maxPendingMessagesAcrossPartitions = 50000;
+    @UriParam(label = "producer", description = "The maximum time period within which the messages sent will be batched if batchingEnabled is true.", defaultValue = "1000")
+    private long batchingMaxPublishDelayMicros = TimeUnit.MILLISECONDS.toMicros(1);
+    @UriParam(label = "producer", description = "The maximum size to batch messages.", defaultValue = "1000")
+    private int batchingMaxMessages = 1000;
+    @UriParam(label = "producer", description = "Control whether automatic batching of messages is enabled for the producer.", defaultValue = "true")
+    private boolean batchingEnabled = true;
+    @UriParam(label = "producer", description = "The first message published will have a sequence Id of initialSequenceId  1.", defaultValue = "-1")
+    private long initialSequenceId = -1;
+    @UriParam(label = "producer", description = "Compression type to use, defaults to NONE from [NONE, LZ4, ZLIB]", defaultValue = "NONE")
+    private CompressionType compressionType = CompressionType.NONE;
 
     public String getSubscriptionName() {
         return subscriptionName;
@@ -159,4 +180,127 @@ public class PulsarConfiguration {
     public void setAckGroupTimeMillis(long ackGroupTimeMillis) {
         this.ackGroupTimeMillis = ackGroupTimeMillis;
     }
+
+    /**
+      * Send timeout in milliseconds.
+      * Defaults to 30,000ms (30 seconds)
+     */
+    public void setSendTimeoutMs(int sendTimeoutMs) {
+        this.sendTimeoutMs = sendTimeoutMs;
+    }
+
+    public int getSendTimeoutMs() {
+        return sendTimeoutMs;
+    }
+
+    /**
+     * Set whether the send and asyncSend operations should block when the outgoing message queue is full.
+     * If set to false, send operations will immediately fail with ProducerQueueIsFullError when there is no space left
+     * in the pending queue.
+     * Default is false.
+     */
+    public void setBlockIfQueueFull(boolean blockIfQueueFull) {
+        this.blockIfQueueFull = blockIfQueueFull;
+    }
+
+    public boolean isBlockIfQueueFull() {
+        return blockIfQueueFull;
+    }
+
+    /**
+     * Set the max size of the queue holding the messages pending to receive an acknowledgment from the broker.
+     * Default is 1000.
+     */
+    public void setMaxPendingMessages(int maxPendingMessages) {
+        this.maxPendingMessages = maxPendingMessages;
+    }
+
+    public int getMaxPendingMessages() {
+        return maxPendingMessages;
+    }
+
+    /**
+     * Set the number of max pending messages across all the partitions.
+     * Default is 50000.
+     */
+    public void setMaxPendingMessagesAcrossPartitions(int maxPendingMessagesAcrossPartitions) {
+        this.maxPendingMessagesAcrossPartitions = maxPendingMessagesAcrossPartitions;
+    }
+
+    public int getMaxPendingMessagesAcrossPartitions() {
+        return maxPendingMessagesAcrossPartitions;
+    }
+
+    /**
+     * Set the time period within which the messages sent will be batched if batch messages are
+     * enabled. If set to a non zero value, messages will be queued until either:
+     * <ul>
+     *  <li>this time interval expires</li>
+     *  <li>the max number of messages in a batch is reached
+     * </ul>
+     * Default is 1ms.
+     */
+    public void setBatchingMaxPublishDelayMicros(long batchingMaxPublishDelayMicros) {
+        this.batchingMaxPublishDelayMicros = batchingMaxPublishDelayMicros;
+    }
+
+    public long getBatchingMaxPublishDelayMicros() {
+        return batchingMaxPublishDelayMicros;
+    }
+
+    /**
+     * Set the maximum number of messages permitted in a batch.
+     * Default 1,000.
+     */
+    public void setBatchingMaxMessages(int batchingMaxMessages) {
+        this.batchingMaxMessages = batchingMaxMessages;
+    }
+
+    public int getBatchingMaxMessages() {
+        return batchingMaxMessages;
+    }
+
+    /**
+     * Control whether automatic batching of messages is enabled for the producer.
+     * Default is true.
+     */
+    public void setBatchingEnabled(boolean batchingEnabled) {
+        this.batchingEnabled = batchingEnabled;
+    }
+
+    public boolean isBatchingEnabled() {
+        return batchingEnabled;
+    }
+
+    /**
+     * Set the baseline for the sequence ids for messages published by the producer.
+     * First message will be using (initialSequenceId  1) as its sequence id and subsequent messages will be assigned
+     * incremental sequence ids, if not otherwise specified.
+     */
+    public void setInitialSequenceId(long initialSequenceId) {
+        this.initialSequenceId = initialSequenceId;
+    }
+
+    public long getInitialSequenceId() {
+        return initialSequenceId;
+    }
+
+    /**
+     *
+     * Set the compression type for the producer.
+     * Supported compression types are:
+     * <ul>
+     *  <li>NONE: No compression</li>
+     *  <li>LZ4: Compress with LZ4 algorithm. Faster but lower compression than ZLib</li>
+     *  <li>ZLI: Standard ZLib compression</li>
+     * </ul>
+     * Default is NONE
+     */
+    public void setCompressionType(String compressionType) {
+        this.compressionType = CompressionType.valueOf(compressionType.toUpperCase());
+    }
+
+    public CompressionType getCompressionType() {
+        return compressionType;
+    }
 }