You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2019/08/14 19:40:41 UTC

[camel] 01/02: Increase configuration options for Pulsar

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch camel-2.x
in repository https://gitbox.apache.org/repos/asf/camel.git

commit c56005d487c0a55e0e375de7c556f82504f5527e
Author: Sherman Richard <sh...@thehutgroup.com>
AuthorDate: Tue Aug 13 11:10:40 2019 +0100

    Increase configuration options for Pulsar
---
 .../camel/component/pulsar/PulsarProducer.java     |  20 ++-
 .../pulsar/configuration/PulsarConfiguration.java  | 147 ++++++++++++++++++++-
 2 files changed, 162 insertions(+), 5 deletions(-)

diff --git a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarProducer.java b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarProducer.java
index 06ac5e7..c425400 100644
--- a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarProducer.java
+++ b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarProducer.java
@@ -16,17 +16,18 @@
  */
 package org.apache.camel.component.pulsar;
 
-import java.util.Map;
-
 import org.apache.camel.Exchange;
 import org.apache.camel.Message;
 import org.apache.camel.NoTypeConversionAvailableException;
 import org.apache.camel.TypeConversionException;
+import org.apache.camel.component.pulsar.configuration.PulsarConfiguration;
 import org.apache.camel.component.pulsar.utils.message.PulsarMessageUtils;
 import org.apache.camel.impl.DefaultProducer;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.ProducerBuilder;
 
+import java.util.concurrent.TimeUnit;
+
 public class PulsarProducer extends DefaultProducer {
 
     private final PulsarEndpoint pulsarEndpoint;
@@ -55,7 +56,8 @@ public class PulsarProducer extends DefaultProducer {
     private synchronized void createProducer() throws org.apache.pulsar.client.api.PulsarClientException {
         if (producer == null) {
             final String topicUri = pulsarEndpoint.getTopicUri();
-            String producerName = pulsarEndpoint.getPulsarConfiguration().getProducerName();
+            PulsarConfiguration configuration = pulsarEndpoint.getPulsarConfiguration();
+            String producerName = configuration.getProducerName();
             if (producerName == null) {
                 producerName = topicUri + "-" + Thread.currentThread().getId();
             }
@@ -63,7 +65,17 @@ public class PulsarProducer extends DefaultProducer {
                     .getPulsarClient()
                     .newProducer()
                     .producerName(producerName)
-                    .topic(topicUri);
+                    .topic(topicUri)
+                    .sendTimeout(configuration.getSendTimeoutMs(), TimeUnit.MILLISECONDS)
+                    .blockIfQueueFull(configuration.isBlockIfQueueFull())
+                    .maxPendingMessages(configuration.getMaxPendingMessages())
+                    .maxPendingMessagesAcrossPartitions(configuration.getMaxPendingMessagesAcrossPartitions())
+                    .batchingMaxPublishDelay(configuration.getBatchingMaxPublishDelayMicros(), TimeUnit.MICROSECONDS)
+                    .batchingMaxMessages(configuration.getMaxPendingMessages())
+                    .enableBatching(configuration.isBatchingEnabled())
+                    .initialSequenceId(configuration.getInitialSequenceId())
+                    .compressionType(configuration.getCompressionType());
+
             producer = producerBuilder.create();
         }
     }
diff --git a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/configuration/PulsarConfiguration.java b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/configuration/PulsarConfiguration.java
index 3c174bb..95441d6 100644
--- a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/configuration/PulsarConfiguration.java
+++ b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/configuration/PulsarConfiguration.java
@@ -19,13 +19,16 @@ package org.apache.camel.component.pulsar.configuration;
 import org.apache.camel.component.pulsar.utils.consumers.SubscriptionType;
 import org.apache.camel.spi.UriParam;
 import org.apache.camel.spi.UriParams;
+import org.apache.pulsar.client.api.CompressionType;
+
+import java.util.concurrent.TimeUnit;
 
 import static org.apache.camel.component.pulsar.utils.consumers.SubscriptionType.EXCLUSIVE;
 
 @UriParams
 public class PulsarConfiguration {
 
-    @UriParam(label = "consumer", defaultValue = "subscription")
+    @UriParam(label = "consumer", defaultValue = "subs")
     private String subscriptionName = "subs";
     @UriParam(label = "consumer", enums = "EXCLUSIVE, SHARED, FAILOVER", defaultValue = "EXCLUSIVE")
     private SubscriptionType subscriptionType = EXCLUSIVE;
@@ -39,6 +42,25 @@ public class PulsarConfiguration {
     private String producerName = "default-producer";
     @UriParam(label = "consumer", defaultValue = "cons")
     private String consumerNamePrefix = "cons";
+    @UriParam(label = "producer", description = "Send timeout in milliseconds", defaultValue = "30000")
+    private int sendTimeoutMs = 30000;
+    @UriParam(label = "producer", description = "Whether to block the producing thread if pending messages queue is full or to throw a ProducerQueueIsFullError", defaultValue = "false")
+    private boolean blockIfQueueFull = false;
+    @UriParam(label = "producer", description = "Size of the pending massages queue. When the queue is full, by default, any further sends will fail unless blockIfQueueFull=true", defaultValue = "1000")
+    private int maxPendingMessages = 1000;
+    @UriParam(label = "producer", description = "The maximum number of pending messages for partitioned topics. The maxPendingMessages value will be reduced if (number of partitions * maxPendingMessages) exceeds this value. Partitioned topics have a pending message queue for each partition.", defaultValue = "50000")
+    private int maxPendingMessagesAcrossPartitions = 50000;
+    @UriParam(label = "producer", description = "The maximum time period within which the messages sent will be batched if batchingEnabled is true.", defaultValue = "1000")
+    private long batchingMaxPublishDelayMicros = TimeUnit.MILLISECONDS.toMicros(1);
+    @UriParam(label = "producer", description = "The maximum size to batch messages.", defaultValue = "1000")
+    private int batchingMaxMessages = 1000;
+    @UriParam(label = "producer", description = "Control whether automatic batching of messages is enabled for the producer.", defaultValue = "true")
+    private boolean batchingEnabled = true;
+    @UriParam(label = "producer", description = "The first message published will have a sequence Id of initialSequenceId + 1.", defaultValue = "-1")
+    private long initialSequenceId = -1;
+    @UriParam(label = "producer", description = "Compression type to use, defaults to NONE from [NONE, LZ4, ZLIB]", defaultValue = "NONE")
+    private CompressionType compressionType = CompressionType.NONE;
+
 
     public String getSubscriptionName() {
         return subscriptionName;
@@ -116,4 +138,127 @@ public class PulsarConfiguration {
     public void setConsumerNamePrefix(String consumerNamePrefix) {
         this.consumerNamePrefix = consumerNamePrefix;
     }
+
+    /**
+     * Send timeout in milliseconds.
+     * Defaults to 30,000ms (30 seconds)
+     */
+    public void setSendTimeoutMs(int sendTimeoutMs) {
+        this.sendTimeoutMs = sendTimeoutMs;
+    }
+
+    public int getSendTimeoutMs() {
+        return sendTimeoutMs;
+    }
+
+    /**
+     * Set whether the send and asyncSend operations should block when the outgoing message queue is full.
+     * If set to false, send operations will immediately fail with ProducerQueueIsFullError when there is no space left
+     * in the pending queue.
+     * Default is false.
+     */
+    public void setBlockIfQueueFull(boolean blockIfQueueFull) {
+        this.blockIfQueueFull = blockIfQueueFull;
+    }
+
+    public boolean isBlockIfQueueFull() {
+        return blockIfQueueFull;
+    }
+
+    /**
+     * Set the max size of the queue holding the messages pending to receive an acknowledgment from the broker.
+     * Default is 1000.
+     */
+    public void setMaxPendingMessages(int maxPendingMessages) {
+        this.maxPendingMessages = maxPendingMessages;
+    }
+
+    public int getMaxPendingMessages() {
+        return maxPendingMessages;
+    }
+
+    /**
+     * Set the number of max pending messages across all the partitions.
+     * Default is 50000.
+     */
+    public void setMaxPendingMessagesAcrossPartitions(int maxPendingMessagesAcrossPartitions) {
+        this.maxPendingMessagesAcrossPartitions = maxPendingMessagesAcrossPartitions;
+    }
+
+    public int getMaxPendingMessagesAcrossPartitions() {
+        return maxPendingMessagesAcrossPartitions;
+    }
+
+    /**
+     * Set the time period within which the messages sent will be batched if batch messages are
+     * enabled. If set to a non zero value, messages will be queued until either:
+     * <ul>
+     *  <li>this time interval expires</li>
+     *  <li>the max number of messages in a batch is reached
+     * </ul>
+     * Default is 1ms.
+     */
+    public void setBatchingMaxPublishDelayMicros(long batchingMaxPublishDelayMicros) {
+        this.batchingMaxPublishDelayMicros = batchingMaxPublishDelayMicros;
+    }
+
+    public long getBatchingMaxPublishDelayMicros() {
+        return batchingMaxPublishDelayMicros;
+    }
+
+    /**
+     * Set the maximum number of messages permitted in a batch.
+     * Default 1,000.
+     */
+    public void setBatchingMaxMessages(int batchingMaxMessages) {
+        this.batchingMaxMessages = batchingMaxMessages;
+    }
+
+    public int getBatchingMaxMessages() {
+        return batchingMaxMessages;
+    }
+
+    /**
+     * Control whether automatic batching of messages is enabled for the producer.
+     * Default is true.
+     */
+    public void setBatchingEnabled(boolean batchingEnabled) {
+        this.batchingEnabled = batchingEnabled;
+    }
+
+    public boolean isBatchingEnabled() {
+        return batchingEnabled;
+    }
+
+    /**
+     * Set the baseline for the sequence ids for messages published by the producer.
+     * First message will be using (initialSequenceId + 1) as its sequence id and subsequent messages will be assigned
+     * incremental sequence ids, if not otherwise specified.
+     */
+    public void setInitialSequenceId(long initialSequenceId) {
+        this.initialSequenceId = initialSequenceId;
+    }
+
+    public long getInitialSequenceId() {
+        return initialSequenceId;
+    }
+
+    /**
+     *
+     * Set the compression type for the producer.
+     * Supported compression types are:
+     * <ul>
+     *  <li>NONE: No compression</li>
+     *  <li>LZ4: Compress with LZ4 algorithm. Faster but lower compression than ZLib</li>
+     *  <li>ZLI: Standard ZLib compression</li>
+     * </ul>
+     * Default is NONE
+     */
+    public void setCompressionType(String compressionType) {
+        this.compressionType = CompressionType.valueOf(compressionType.toUpperCase());
+    }
+
+    public CompressionType getCompressionType() {
+        return compressionType;
+    }
 }