You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2019/08/14 19:40:40 UTC

[camel] branch camel-2.x updated (1272956 -> 274e5d9)

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a change to branch camel-2.x
in repository https://gitbox.apache.org/repos/asf/camel.git.


    from 1272956  CAMEL-13864: Allow subscriptionName to be set for non-durable shared subscription
     new c56005d  Increase configuration options for Pulsar
     new 274e5d9  Fix checkstyle issues

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../src/main/docs/pulsar-component.adoc            |  13 +-
 .../camel/component/pulsar/PulsarProducer.java     |  18 ++-
 .../pulsar/configuration/PulsarConfiguration.java  | 149 ++++++++++++++++++++-
 3 files changed, 174 insertions(+), 6 deletions(-)


[camel] 01/02: Increase configuration options for Pulsar

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch camel-2.x
in repository https://gitbox.apache.org/repos/asf/camel.git

commit c56005d487c0a55e0e375de7c556f82504f5527e
Author: Sherman Richard <sh...@thehutgroup.com>
AuthorDate: Tue Aug 13 11:10:40 2019 +0100

    Increase configuration options for Pulsar
---
 .../camel/component/pulsar/PulsarProducer.java     |  20 ++-
 .../pulsar/configuration/PulsarConfiguration.java  | 147 ++++++++++++++++++++-
 2 files changed, 162 insertions(+), 5 deletions(-)

diff --git a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarProducer.java b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarProducer.java
index 06ac5e7..c425400 100644
--- a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarProducer.java
+++ b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarProducer.java
@@ -16,17 +16,18 @@
  */
 package org.apache.camel.component.pulsar;
 
-import java.util.Map;
-
 import org.apache.camel.Exchange;
 import org.apache.camel.Message;
 import org.apache.camel.NoTypeConversionAvailableException;
 import org.apache.camel.TypeConversionException;
+import org.apache.camel.component.pulsar.configuration.PulsarConfiguration;
 import org.apache.camel.component.pulsar.utils.message.PulsarMessageUtils;
 import org.apache.camel.impl.DefaultProducer;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.ProducerBuilder;
 
+import java.util.concurrent.TimeUnit;
+
 public class PulsarProducer extends DefaultProducer {
 
     private final PulsarEndpoint pulsarEndpoint;
@@ -55,7 +56,8 @@ public class PulsarProducer extends DefaultProducer {
     private synchronized void createProducer() throws org.apache.pulsar.client.api.PulsarClientException {
         if (producer == null) {
             final String topicUri = pulsarEndpoint.getTopicUri();
-            String producerName = pulsarEndpoint.getPulsarConfiguration().getProducerName();
+            PulsarConfiguration configuration = pulsarEndpoint.getPulsarConfiguration();
+            String producerName = configuration.getProducerName();
             if (producerName == null) {
                 producerName = topicUri + "-" + Thread.currentThread().getId();
             }
@@ -63,7 +65,17 @@ public class PulsarProducer extends DefaultProducer {
                     .getPulsarClient()
                     .newProducer()
                     .producerName(producerName)
-                    .topic(topicUri);
+                    .topic(topicUri)
+                    .sendTimeout(configuration.getSendTimeoutMs(), TimeUnit.MILLISECONDS)
+                    .blockIfQueueFull(configuration.isBlockIfQueueFull())
+                    .maxPendingMessages(configuration.getMaxPendingMessages())
+                    .maxPendingMessagesAcrossPartitions(configuration.getMaxPendingMessagesAcrossPartitions())
+                    .batchingMaxPublishDelay(configuration.getBatchingMaxPublishDelayMicros(), TimeUnit.MICROSECONDS)
+                    .batchingMaxMessages(configuration.getMaxPendingMessages())
+                    .enableBatching(configuration.isBatchingEnabled())
+                    .initialSequenceId(configuration.getInitialSequenceId())
+                    .compressionType(configuration.getCompressionType());
+
             producer = producerBuilder.create();
         }
     }
diff --git a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/configuration/PulsarConfiguration.java b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/configuration/PulsarConfiguration.java
index 3c174bb..95441d6 100644
--- a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/configuration/PulsarConfiguration.java
+++ b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/configuration/PulsarConfiguration.java
@@ -19,13 +19,16 @@ package org.apache.camel.component.pulsar.configuration;
 import org.apache.camel.component.pulsar.utils.consumers.SubscriptionType;
 import org.apache.camel.spi.UriParam;
 import org.apache.camel.spi.UriParams;
+import org.apache.pulsar.client.api.CompressionType;
+
+import java.util.concurrent.TimeUnit;
 
 import static org.apache.camel.component.pulsar.utils.consumers.SubscriptionType.EXCLUSIVE;
 
 @UriParams
 public class PulsarConfiguration {
 
-    @UriParam(label = "consumer", defaultValue = "subscription")
+    @UriParam(label = "consumer", defaultValue = "subs")
     private String subscriptionName = "subs";
     @UriParam(label = "consumer", enums = "EXCLUSIVE, SHARED, FAILOVER", defaultValue = "EXCLUSIVE")
     private SubscriptionType subscriptionType = EXCLUSIVE;
@@ -39,6 +42,25 @@ public class PulsarConfiguration {
     private String producerName = "default-producer";
     @UriParam(label = "consumer", defaultValue = "cons")
     private String consumerNamePrefix = "cons";
+    @UriParam(label = "producer", description = "Send timeout in milliseconds", defaultValue = "30000")
+    private int sendTimeoutMs = 30000;
+    @UriParam(label = "producer", description = "Whether to block the producing thread if pending messages queue is full or to throw a ProducerQueueIsFullError", defaultValue = "false")
+    private boolean blockIfQueueFull = false;
+    @UriParam(label = "producer", description = "Size of the pending massages queue. When the queue is full, by default, any further sends will fail unless blockIfQueueFull=true", defaultValue = "1000")
+    private int maxPendingMessages = 1000;
+    @UriParam(label = "producer", description = "The maximum number of pending messages for partitioned topics. The maxPendingMessages value will be reduced if (number of partitions * maxPendingMessages) exceeds this value. Partitioned topics have a pending message queue for each partition.", defaultValue = "50000")
+    private int maxPendingMessagesAcrossPartitions = 50000;
+    @UriParam(label = "producer", description = "The maximum time period within which the messages sent will be batched if batchingEnabled is true.", defaultValue = "1000")
+    private long batchingMaxPublishDelayMicros = TimeUnit.MILLISECONDS.toMicros(1);
+    @UriParam(label = "producer", description = "The maximum size to batch messages.", defaultValue = "1000")
+    private int batchingMaxMessages = 1000;
+    @UriParam(label = "producer", description = "Control whether automatic batching of messages is enabled for the producer.", defaultValue = "true")
+    private boolean batchingEnabled = true;
+    @UriParam(label = "producer", description = "The first message published will have a sequence Id of initialSequenceId + 1.", defaultValue = "-1")
+    private long initialSequenceId = -1;
+    @UriParam(label = "producer", description = "Compression type to use, defaults to NONE from [NONE, LZ4, ZLIB]", defaultValue = "NONE")
+    private CompressionType compressionType = CompressionType.NONE;
+
 
     public String getSubscriptionName() {
         return subscriptionName;
@@ -116,4 +138,127 @@ public class PulsarConfiguration {
     public void setConsumerNamePrefix(String consumerNamePrefix) {
         this.consumerNamePrefix = consumerNamePrefix;
     }
+
+    /**
+     * Send timeout in milliseconds.
+     * Defaults to 30,000ms (30 seconds)
+     */
+    public void setSendTimeoutMs(int sendTimeoutMs) {
+        this.sendTimeoutMs = sendTimeoutMs;
+    }
+
+    public int getSendTimeoutMs() {
+        return sendTimeoutMs;
+    }
+
+    /**
+     * Set whether the send and asyncSend operations should block when the outgoing message queue is full.
+     * If set to false, send operations will immediately fail with ProducerQueueIsFullError when there is no space left
+     * in the pending queue.
+     * Default is false.
+     */
+    public void setBlockIfQueueFull(boolean blockIfQueueFull) {
+        this.blockIfQueueFull = blockIfQueueFull;
+    }
+
+    public boolean isBlockIfQueueFull() {
+        return blockIfQueueFull;
+    }
+
+    /**
+     * Set the max size of the queue holding the messages pending to receive an acknowledgment from the broker.
+     * Default is 1000.
+     */
+    public void setMaxPendingMessages(int maxPendingMessages) {
+        this.maxPendingMessages = maxPendingMessages;
+    }
+
+    public int getMaxPendingMessages() {
+        return maxPendingMessages;
+    }
+
+    /**
+     * Set the number of max pending messages across all the partitions.
+     * Default is 50000.
+     */
+    public void setMaxPendingMessagesAcrossPartitions(int maxPendingMessagesAcrossPartitions) {
+        this.maxPendingMessagesAcrossPartitions = maxPendingMessagesAcrossPartitions;
+    }
+
+    public int getMaxPendingMessagesAcrossPartitions() {
+        return maxPendingMessagesAcrossPartitions;
+    }
+
+    /**
+     * Set the time period within which the messages sent will be batched if batch messages are
+     * enabled. If set to a non zero value, messages will be queued until either:
+     * <ul>
+     *  <li>this time interval expires</li>
+     *  <li>the max number of messages in a batch is reached
+     * </ul>
+     * Default is 1ms.
+     */
+    public void setBatchingMaxPublishDelayMicros(long batchingMaxPublishDelayMicros) {
+        this.batchingMaxPublishDelayMicros = batchingMaxPublishDelayMicros;
+    }
+
+    public long getBatchingMaxPublishDelayMicros() {
+        return batchingMaxPublishDelayMicros;
+    }
+
+    /**
+     * Set the maximum number of messages permitted in a batch.
+     * Default 1,000.
+     */
+    public void setBatchingMaxMessages(int batchingMaxMessages) {
+        this.batchingMaxMessages = batchingMaxMessages;
+    }
+
+    public int getBatchingMaxMessages() {
+        return batchingMaxMessages;
+    }
+
+    /**
+     * Control whether automatic batching of messages is enabled for the producer.
+     * Default is true.
+     */
+    public void setBatchingEnabled(boolean batchingEnabled) {
+        this.batchingEnabled = batchingEnabled;
+    }
+
+    public boolean isBatchingEnabled() {
+        return batchingEnabled;
+    }
+
+    /**
+     * Set the baseline for the sequence ids for messages published by the producer.
+     * First message will be using (initialSequenceId + 1) as its sequence id and subsequent messages will be assigned
+     * incremental sequence ids, if not otherwise specified.
+     */
+    public void setInitialSequenceId(long initialSequenceId) {
+        this.initialSequenceId = initialSequenceId;
+    }
+
+    public long getInitialSequenceId() {
+        return initialSequenceId;
+    }
+
+    /**
+     *
+     * Set the compression type for the producer.
+     * Supported compression types are:
+     * <ul>
+     *  <li>NONE: No compression</li>
+     *  <li>LZ4: Compress with LZ4 algorithm. Faster but lower compression than ZLib</li>
+     *  <li>ZLI: Standard ZLib compression</li>
+     * </ul>
+     * Default is NONE
+     */
+    public void setCompressionType(String compressionType) {
+        this.compressionType = CompressionType.valueOf(compressionType.toUpperCase());
+    }
+
+    public CompressionType getCompressionType() {
+        return compressionType;
+    }
 }


[camel] 02/02: Fix checkstyle issues

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch camel-2.x
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 274e5d916fe9dce56f1756c8ef675060182293c0
Author: Sherman Richard <sh...@thehutgroup.com>
AuthorDate: Tue Aug 13 15:18:46 2019 +0100

    Fix checkstyle issues
---
 components/camel-pulsar/src/main/docs/pulsar-component.adoc | 13 +++++++++++--
 .../org/apache/camel/component/pulsar/PulsarProducer.java   |  4 ++--
 .../component/pulsar/configuration/PulsarConfiguration.java | 12 +++++++-----
 3 files changed, 20 insertions(+), 9 deletions(-)

diff --git a/components/camel-pulsar/src/main/docs/pulsar-component.adoc b/components/camel-pulsar/src/main/docs/pulsar-component.adoc
index a8f2193..7c9ca4c 100644
--- a/components/camel-pulsar/src/main/docs/pulsar-component.adoc
+++ b/components/camel-pulsar/src/main/docs/pulsar-component.adoc
@@ -64,7 +64,7 @@ with the following path and query parameters:
 |===
 
 
-==== Query Parameters (11 parameters):
+==== Query Parameters (20 parameters):
 
 
 [width="100%",cols="2,5,^1,2",options="header"]
@@ -75,11 +75,20 @@ with the following path and query parameters:
 | *consumerNamePrefix* (consumer) | Prefix to add to consumer names when a SHARED or FAILOVER subscription is used | cons | String
 | *consumerQueueSize* (consumer) | Size of the consumer queue - defaults to 10 | 10 | int
 | *numberOfConsumers* (consumer) | Number of consumers - defaults to 1 | 1 | int
-| *subscriptionName* (consumer) | Name of the subscription to use | subscription | String
+| *subscriptionName* (consumer) | Name of the subscription to use | subs | String
 | *subscriptionType* (consumer) | Type of the subscription EXCLUSIVESHAREDFAILOVER, defaults to EXCLUSIVE | EXCLUSIVE | SubscriptionType
 | *exceptionHandler* (consumer) | To let the consumer use a custom ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this option is not in use. By default the consumer will deal with exceptions, that will be logged at WARN or ERROR level and ignored. |  | ExceptionHandler
 | *exchangePattern* (consumer) | Sets the exchange pattern when the consumer creates an exchange. |  | ExchangePattern
+| *batchingEnabled* (producer) | Control whether automatic batching of messages is enabled for the producer. Default is true. | true | boolean
+| *batchingMaxMessages* (producer) | Set the maximum number of messages permitted in a batch. Default 1,000. | 1000 | int
+| *batchingMaxPublishDelay Micros* (producer) | Set the time period within which the messages sent will be batched if batch messages are enabled. If set to a non zero value, messages will be queued until either: this time interval expires the max number of messages in a batch is reached Default is 1ms. | 1000 | long
+| *blockIfQueueFull* (producer) | Set whether the send and asyncSend operations should block when the outgoing message queue is full. If set to false, send operations will immediately fail with ProducerQueueIsFullError when there is no space left in the pending queue. Default is false. | false | boolean
+| *compressionType* (producer) | Set the compression type for the producer. Supported compression types are: NONE: No compression LZ4: Compress with LZ4 algorithm. Faster but lower compression than ZLib ZLI: Standard ZLib compression Default is NONE | NONE | CompressionType
+| *initialSequenceId* (producer) | Set the baseline for the sequence ids for messages published by the producer. First message will be using (initialSequenceId 1) as its sequence id and subsequent messages will be assigned incremental sequence ids, if not otherwise specified. | -1 | long
+| *maxPendingMessages* (producer) | Set the max size of the queue holding the messages pending to receive an acknowledgment from the broker. Default is 1000. | 1000 | int
+| *maxPendingMessagesAcross Partitions* (producer) | Set the number of max pending messages across all the partitions. Default is 50000. | 50000 | int
 | *producerName* (producer) | Name of the producer | default-producer | String
+| *sendTimeoutMs* (producer) | Send timeout in milliseconds. Defaults to 30,000ms (30 seconds) | 30000 | int
 | *synchronous* (advanced) | Sets whether synchronous processing should be strictly used, or Camel is allowed to use asynchronous processing (if supported). | false | boolean
 |===
 // endpoint options: END
diff --git a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarProducer.java b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarProducer.java
index c425400..1d6673a 100644
--- a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarProducer.java
+++ b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarProducer.java
@@ -16,6 +16,8 @@
  */
 package org.apache.camel.component.pulsar;
 
+import java.util.concurrent.TimeUnit;
+
 import org.apache.camel.Exchange;
 import org.apache.camel.Message;
 import org.apache.camel.NoTypeConversionAvailableException;
@@ -26,8 +28,6 @@ import org.apache.camel.impl.DefaultProducer;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.ProducerBuilder;
 
-import java.util.concurrent.TimeUnit;
-
 public class PulsarProducer extends DefaultProducer {
 
     private final PulsarEndpoint pulsarEndpoint;
diff --git a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/configuration/PulsarConfiguration.java b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/configuration/PulsarConfiguration.java
index 95441d6..fa4772f 100644
--- a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/configuration/PulsarConfiguration.java
+++ b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/configuration/PulsarConfiguration.java
@@ -16,13 +16,13 @@
  */
 package org.apache.camel.component.pulsar.configuration;
 
+import java.util.concurrent.TimeUnit;
+
 import org.apache.camel.component.pulsar.utils.consumers.SubscriptionType;
 import org.apache.camel.spi.UriParam;
 import org.apache.camel.spi.UriParams;
 import org.apache.pulsar.client.api.CompressionType;
 
-import java.util.concurrent.TimeUnit;
-
 import static org.apache.camel.component.pulsar.utils.consumers.SubscriptionType.EXCLUSIVE;
 
 @UriParams
@@ -45,10 +45,12 @@ public class PulsarConfiguration {
     @UriParam(label = "producer", description = "Send timeout in milliseconds", defaultValue = "30000")
     private int sendTimeoutMs = 30000;
     @UriParam(label = "producer", description = "Whether to block the producing thread if pending messages queue is full or to throw a ProducerQueueIsFullError", defaultValue = "false")
-    private boolean blockIfQueueFull = false;
-    @UriParam(label = "producer", description = "Size of the pending massages queue. When the queue is full, by default, any further sends will fail unless blockIfQueueFull=true", defaultValue = "1000")
+    private boolean blockIfQueueFull;
+    @UriParam(label = "producer", description = "Size of the pending massages queue. When the queue is full, by default, any further sends will fail unless blockIfQueueFull=true",
+            defaultValue = "1000")
     private int maxPendingMessages = 1000;
-    @UriParam(label = "producer", description = "The maximum number of pending messages for partitioned topics. The maxPendingMessages value will be reduced if (number of partitions * maxPendingMessages) exceeds this value. Partitioned topics have a pending message queue for each partition.", defaultValue = "50000")
+    @UriParam(label = "producer", description = "The maximum number of pending messages for partitioned topics. The maxPendingMessages value will be reduced if "
+            + "(number of partitions * maxPendingMessages) exceeds this value. Partitioned topics have a pending message queue for each partition.", defaultValue = "50000")
     private int maxPendingMessagesAcrossPartitions = 50000;
     @UriParam(label = "producer", description = "The maximum time period within which the messages sent will be batched if batchingEnabled is true.", defaultValue = "1000")
     private long batchingMaxPublishDelayMicros = TimeUnit.MILLISECONDS.toMicros(1);