You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2019/09/20 10:44:38 UTC

[camel] branch master updated (f076af5 -> 6125500)

This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git.


    from f076af5  CAMEL-13982: Handle gauge metrics not being reusable
     new f8a5b7b  CAMEL-14001 - Camel-Pulsar: Add MessageRoutingMode and MessageRouter option to producer, messageRouting mode option
     new 56cb7e3  CAMEL-14001 - Camel-Pulsar: Add MessageRoutingMode and MessageRouter option to producer, messageRouter option
     new 10de4ba  Camel-Pulsar: Fixed CS
     new f6d1cb1  Camel-Pulsar: Regen
     new 6125500  Regen docs for Camel-Pulsar on the website side

The 5 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../src/main/docs/pulsar-component.adoc            |  4 +-
 .../camel/component/pulsar/PulsarComponent.java    | 12 ++-
 .../camel/component/pulsar/PulsarEndpoint.java     | 14 ++--
 .../component/pulsar/PulsarMessageListener.java    |  6 +-
 .../component/pulsar/PulsarMessageReceipt.java     | 17 ++--
 .../pulsar/PulsarMessageReceiptFactory.java        |  9 ++-
 .../camel/component/pulsar/PulsarProducer.java     | 28 +++----
 .../pulsar/configuration/PulsarConfiguration.java  | 94 +++++++++++++++-------
 .../camel/component/pulsar/utils/PulsarPath.java   |  2 +-
 .../consumers/CommonCreationStrategyImpl.java      |  5 +-
 .../pulsar/utils/message/PulsarMessageUtils.java   |  2 +-
 .../pulsar/PulsarConcurrentConsumerInTest.java     |  4 +-
 .../pulsar/PulsarConcurrentProducerInTest.java     |  4 +-
 .../pulsar/PulsarConsumerAcknowledgementTest.java  | 31 ++-----
 .../component/pulsar/PulsarConsumerInTest.java     | 19 ++---
 .../PulsarConsumerNoAcknowledgementTest.java       | 22 ++---
 .../pulsar/PulsarCustomMessageReceiptTest.java     | 27 ++-----
 .../component/pulsar/PulsarProducerInTest.java     | 16 +---
 .../camel/component/pulsar/PulsarTestSupport.java  |  4 +-
 .../component/pulsar/utils/PulsarUtilsTest.java    |  4 +-
 .../endpoint/dsl/PulsarEndpointBuilderFactory.java | 60 ++++++++++++++
 .../modules/ROOT/pages/pulsar-component.adoc       |  4 +-
 22 files changed, 221 insertions(+), 167 deletions(-)


[camel] 03/05: Camel-Pulsar: Fixed CS

Posted by ac...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 10de4bab2e76046b9535128bf38bf67b80c3f3f4
Author: Andrea Cosentino <an...@gmail.com>
AuthorDate: Fri Sep 20 12:38:40 2019 +0200

    Camel-Pulsar: Fixed CS
---
 .../camel/component/pulsar/PulsarComponent.java    | 12 ++-
 .../camel/component/pulsar/PulsarEndpoint.java     | 14 ++--
 .../component/pulsar/PulsarMessageListener.java    |  6 +-
 .../component/pulsar/PulsarMessageReceipt.java     | 17 ++--
 .../pulsar/PulsarMessageReceiptFactory.java        |  9 ++-
 .../camel/component/pulsar/PulsarProducer.java     | 24 ++----
 .../pulsar/configuration/PulsarConfiguration.java  | 92 +++++++++++-----------
 .../camel/component/pulsar/utils/PulsarPath.java   |  2 +-
 .../consumers/CommonCreationStrategyImpl.java      |  5 +-
 .../pulsar/utils/message/PulsarMessageUtils.java   |  2 +-
 .../pulsar/PulsarConcurrentConsumerInTest.java     |  4 +-
 .../pulsar/PulsarConcurrentProducerInTest.java     |  4 +-
 .../pulsar/PulsarConsumerAcknowledgementTest.java  | 31 ++------
 .../component/pulsar/PulsarConsumerInTest.java     | 19 ++---
 .../PulsarConsumerNoAcknowledgementTest.java       | 22 ++----
 .../pulsar/PulsarCustomMessageReceiptTest.java     | 27 ++-----
 .../component/pulsar/PulsarProducerInTest.java     | 16 +---
 .../camel/component/pulsar/PulsarTestSupport.java  |  4 +-
 .../component/pulsar/utils/PulsarUtilsTest.java    |  4 +-
 19 files changed, 134 insertions(+), 180 deletions(-)

diff --git a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarComponent.java b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarComponent.java
index b43ab1d..08e6f65 100644
--- a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarComponent.java
+++ b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarComponent.java
@@ -108,9 +108,12 @@ public class PulsarComponent extends DefaultComponent {
     /**
      * Whether to allow manual message acknowledgements.
      * <p/>
-     * If this option is enabled, then messages are not immediately acknowledged after being consumed.
-     * Instead, an instance of {@link PulsarMessageReceipt} is stored as a header on the {@link org.apache.camel.Exchange}.
-     * Messages can then be acknowledged using {@link PulsarMessageReceipt} at any time before the ackTimeout occurs.
+     * If this option is enabled, then messages are not immediately acknowledged
+     * after being consumed. Instead, an instance of
+     * {@link PulsarMessageReceipt} is stored as a header on the
+     * {@link org.apache.camel.Exchange}. Messages can then be acknowledged
+     * using {@link PulsarMessageReceipt} at any time before the ackTimeout
+     * occurs.
      */
     public void setAllowManualAcknowledgement(boolean allowManualAcknowledgement) {
         this.allowManualAcknowledgement = allowManualAcknowledgement;
@@ -121,7 +124,8 @@ public class PulsarComponent extends DefaultComponent {
     }
 
     /**
-     * Provide a factory to create an alternate implementation of {@link PulsarMessageReceipt}.
+     * Provide a factory to create an alternate implementation of
+     * {@link PulsarMessageReceipt}.
      */
     public void setPulsarMessageReceiptFactory(PulsarMessageReceiptFactory pulsarMessageReceiptFactory) {
         this.pulsarMessageReceiptFactory = pulsarMessageReceiptFactory;
diff --git a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarEndpoint.java b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarEndpoint.java
index 0330677..5ab8bbe 100644
--- a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarEndpoint.java
+++ b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarEndpoint.java
@@ -34,13 +34,17 @@ public class PulsarEndpoint extends DefaultEndpoint {
     private PulsarClient pulsarClient;
     private String uri;
 
-    @UriPath(enums = "persistent,non-persistent") @Metadata(required = true)
+    @UriPath(enums = "persistent,non-persistent")
+    @Metadata(required = true)
     private String persistence;
-    @UriPath @Metadata(required = true)
+    @UriPath
+    @Metadata(required = true)
     private String tenant;
-    @UriPath @Metadata(required = true)
+    @UriPath
+    @Metadata(required = true)
     private String namespace;
-    @UriPath @Metadata(required = true)
+    @UriPath
+    @Metadata(required = true)
     private String topic;
     @UriParam
     private PulsarConfiguration pulsarConfiguration;
@@ -140,6 +144,6 @@ public class PulsarEndpoint extends DefaultEndpoint {
 
     @Override
     public PulsarComponent getComponent() {
-        return (PulsarComponent) super.getComponent();
+        return (PulsarComponent)super.getComponent();
     }
 }
diff --git a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarMessageListener.java b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarMessageListener.java
index 55249fc..11e87fb 100644
--- a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarMessageListener.java
+++ b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarMessageListener.java
@@ -47,8 +47,7 @@ public class PulsarMessageListener implements MessageListener<byte[]> {
 
         try {
             if (endpoint.getPulsarConfiguration().isAllowManualAcknowledgement()) {
-                exchange.getIn().setHeader(PulsarMessageHeaders.MESSAGE_RECEIPT,
-                        endpoint.getComponent().getPulsarMessageReceiptFactory().newInstance(exchange, message, consumer));
+                exchange.getIn().setHeader(PulsarMessageHeaders.MESSAGE_RECEIPT, endpoint.getComponent().getPulsarMessageReceiptFactory().newInstance(exchange, message, consumer));
                 processor.process(exchange);
             } else {
                 processor.process(exchange);
@@ -60,8 +59,7 @@ public class PulsarMessageListener implements MessageListener<byte[]> {
     }
 
     private void handleProcessorException(final Exchange exchange, final Exception exception) {
-        final Exchange exchangeWithException = PulsarMessageUtils
-            .updateExchangeWithException(exception, exchange);
+        final Exchange exchangeWithException = PulsarMessageUtils.updateExchangeWithException(exception, exchange);
 
         exceptionHandler.handleException("An error occurred", exchangeWithException, exception);
     }
diff --git a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarMessageReceipt.java b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarMessageReceipt.java
index 7cdfd90..bb2f849 100644
--- a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarMessageReceipt.java
+++ b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarMessageReceipt.java
@@ -25,8 +25,10 @@ import org.apache.pulsar.client.api.PulsarClientException;
 /**
  * Acknowledge the receipt of a message using the Pulsar consumer.
  * <p>
- * Available on the {@link Exchange} if {@link PulsarConfiguration#isAllowManualAcknowledgement()} is true.
- * An alternative to the default may be provided by implementing {@link PulsarMessageReceiptFactory}.
+ * Available on the {@link Exchange} if
+ * {@link PulsarConfiguration#isAllowManualAcknowledgement()} is true. An
+ * alternative to the default may be provided by implementing
+ * {@link PulsarMessageReceiptFactory}.
  */
 public interface PulsarMessageReceipt {
 
@@ -38,7 +40,8 @@ public interface PulsarMessageReceipt {
     void acknowledge() throws PulsarClientException;
 
     /**
-     * Acknowledge receipt of all of the messages in the stream up to and including this message synchronously.
+     * Acknowledge receipt of all of the messages in the stream up to and
+     * including this message synchronously.
      *
      * @see org.apache.pulsar.client.api.Consumer#acknowledgeCumulative(MessageId)
      */
@@ -52,7 +55,8 @@ public interface PulsarMessageReceipt {
     CompletableFuture<Void> acknowledgeAsync();
 
     /**
-     * Acknowledge receipt of all of the messages in the stream up to and including this message asynchronously.
+     * Acknowledge receipt of all of the messages in the stream up to and
+     * including this message asynchronously.
      *
      * @see org.apache.pulsar.client.api.Consumer#acknowledgeCumulativeAsync(MessageId)
      */
@@ -62,9 +66,10 @@ public interface PulsarMessageReceipt {
      * Acknowledge the failure to process this message.
      *
      * @see org.apache.pulsar.client.api.Consumer#negativeAcknowledge(MessageId)
-     * Note: Available in Puslar 2.4.0. Implementations with earlier versions should return an {@link java.lang.UnsupportedOperationException}.
+     *      Note: Available in Puslar 2.4.0. Implementations with earlier
+     *      versions should return an
+     *      {@link java.lang.UnsupportedOperationException}.
      */
     void negativeAcknowledge();
 
 }
-
diff --git a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarMessageReceiptFactory.java b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarMessageReceiptFactory.java
index 314ae95..0a09723 100644
--- a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarMessageReceiptFactory.java
+++ b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarMessageReceiptFactory.java
@@ -21,10 +21,13 @@ import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
 
 /**
- * Factory to create a new {@link PulsarMessageReceipt} to store on the {@link Exchange}.
+ * Factory to create a new {@link PulsarMessageReceipt} to store on the
+ * {@link Exchange}.
  * <p>
- * Implement this interface if an alternate implementation of {@link PulsarMessageReceipt} is required
- * as newer Pulsar clients may have acknowledgement functionality not yet supported by {@link DefaultPulsarMessageReceipt}.
+ * Implement this interface if an alternate implementation of
+ * {@link PulsarMessageReceipt} is required as newer Pulsar clients may have
+ * acknowledgement functionality not yet supported by
+ * {@link DefaultPulsarMessageReceipt}.
  */
 public interface PulsarMessageReceiptFactory {
 
diff --git a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarProducer.java b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarProducer.java
index d7c8b0a..5503949 100644
--- a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarProducer.java
+++ b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarProducer.java
@@ -44,8 +44,7 @@ public class PulsarProducer extends DefaultProducer {
         final Message message = exchange.getIn();
         byte[] body;
         try {
-            body = exchange.getContext().getTypeConverter()
-                    .mandatoryConvertTo(byte[].class, exchange, message.getBody());
+            body = exchange.getContext().getTypeConverter().mandatoryConvertTo(byte[].class, exchange, message.getBody());
         } catch (NoTypeConversionAvailableException | TypeConversionException exception) {
             // fallback to try serialize the data
             body = PulsarMessageUtils.serialize(message.getBody());
@@ -61,24 +60,15 @@ public class PulsarProducer extends DefaultProducer {
             if (producerName == null) {
                 producerName = topicUri + "-" + Thread.currentThread().getId();
             }
-            final ProducerBuilder<byte[]> producerBuilder = pulsarEndpoint
-                    .getPulsarClient()
-                    .newProducer()
-                    .producerName(producerName)
-                    .topic(topicUri)
-                    .sendTimeout(configuration.getSendTimeoutMs(), TimeUnit.MILLISECONDS)
-                    .blockIfQueueFull(configuration.isBlockIfQueueFull())
-                    .maxPendingMessages(configuration.getMaxPendingMessages())
-                    .maxPendingMessagesAcrossPartitions(configuration.getMaxPendingMessagesAcrossPartitions())
-                    .batchingMaxPublishDelay(configuration.getBatchingMaxPublishDelayMicros(), TimeUnit.MICROSECONDS)
-                    .batchingMaxMessages(configuration.getMaxPendingMessages())
-                    .enableBatching(configuration.isBatchingEnabled())
-                    .initialSequenceId(configuration.getInitialSequenceId())
-                    .compressionType(configuration.getCompressionType());
+            final ProducerBuilder<byte[]> producerBuilder = pulsarEndpoint.getPulsarClient().newProducer().producerName(producerName).topic(topicUri)
+                .sendTimeout(configuration.getSendTimeoutMs(), TimeUnit.MILLISECONDS).blockIfQueueFull(configuration.isBlockIfQueueFull())
+                .maxPendingMessages(configuration.getMaxPendingMessages()).maxPendingMessagesAcrossPartitions(configuration.getMaxPendingMessagesAcrossPartitions())
+                .batchingMaxPublishDelay(configuration.getBatchingMaxPublishDelayMicros(), TimeUnit.MICROSECONDS).batchingMaxMessages(configuration.getMaxPendingMessages())
+                .enableBatching(configuration.isBatchingEnabled()).initialSequenceId(configuration.getInitialSequenceId()).compressionType(configuration.getCompressionType());
             if (ObjectHelper.isNotEmpty(configuration.getMessageRouter())) {
                 producerBuilder.messageRouter(configuration.getMessageRouter());
             } else {
-            	producerBuilder.messageRoutingMode(configuration.getMessageRoutingMode());
+                producerBuilder.messageRoutingMode(configuration.getMessageRoutingMode());
             }
             producer = producerBuilder.create();
         }
diff --git a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/configuration/PulsarConfiguration.java b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/configuration/PulsarConfiguration.java
index ea34312..b60bfed 100644
--- a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/configuration/PulsarConfiguration.java
+++ b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/configuration/PulsarConfiguration.java
@@ -55,11 +55,10 @@ public class PulsarConfiguration {
     private int sendTimeoutMs = 30000;
     @UriParam(label = "producer", description = "Whether to block the producing thread if pending messages queue is full or to throw a ProducerQueueIsFullError", defaultValue = "false")
     private boolean blockIfQueueFull;
-    @UriParam(label = "producer", description = "Size of the pending massages queue. When the queue is full, by default, any further sends will fail unless blockIfQueueFull=true",
-            defaultValue = "1000")
+    @UriParam(label = "producer", description = "Size of the pending massages queue. When the queue is full, by default, any further sends will fail unless blockIfQueueFull=true", defaultValue = "1000")
     private int maxPendingMessages = 1000;
     @UriParam(label = "producer", description = "The maximum number of pending messages for partitioned topics. The maxPendingMessages value will be reduced if "
-            + "(number of partitions * maxPendingMessages) exceeds this value. Partitioned topics have a pending message queue for each partition.", defaultValue = "50000")
+                                                + "(number of partitions * maxPendingMessages) exceeds this value. Partitioned topics have a pending message queue for each partition.", defaultValue = "50000")
     private int maxPendingMessagesAcrossPartitions = 50000;
     @UriParam(label = "producer", description = "The maximum time period within which the messages sent will be batched if batchingEnabled is true.", defaultValue = "1000")
     private long batchingMaxPublishDelayMicros = TimeUnit.MILLISECONDS.toMicros(1);
@@ -92,7 +91,8 @@ public class PulsarConfiguration {
     }
 
     /**
-     * Type of the subscription [EXCLUSIVE|SHARED|FAILOVER], defaults to EXCLUSIVE
+     * Type of the subscription [EXCLUSIVE|SHARED|FAILOVER], defaults to
+     * EXCLUSIVE
      */
     public void setSubscriptionType(SubscriptionType subscriptionType) {
         this.subscriptionType = subscriptionType;
@@ -147,7 +147,8 @@ public class PulsarConfiguration {
     }
 
     /**
-     * Prefix to add to consumer names when a SHARED or FAILOVER subscription is used
+     * Prefix to add to consumer names when a SHARED or FAILOVER subscription is
+     * used
      */
     public void setConsumerNamePrefix(String consumerNamePrefix) {
         this.consumerNamePrefix = consumerNamePrefix;
@@ -160,9 +161,12 @@ public class PulsarConfiguration {
     /**
      * Whether to allow manual message acknowledgements.
      * <p/>
-     * If this option is enabled, then messages are not immediately acknowledged after being consumed.
-     * Instead, an instance of {@link PulsarMessageReceipt} is stored as a header on the {@link org.apache.camel.Exchange}.
-     * Messages can then be acknowledged using {@link PulsarMessageReceipt} at any time before the ackTimeout occurs.
+     * If this option is enabled, then messages are not immediately acknowledged
+     * after being consumed. Instead, an instance of
+     * {@link PulsarMessageReceipt} is stored as a header on the
+     * {@link org.apache.camel.Exchange}. Messages can then be acknowledged
+     * using {@link PulsarMessageReceipt} at any time before the ackTimeout
+     * occurs.
      */
     public void setAllowManualAcknowledgement(boolean allowManualAcknowledgement) {
         this.allowManualAcknowledgement = allowManualAcknowledgement;
@@ -184,15 +188,15 @@ public class PulsarConfiguration {
     }
 
     /**
-     * Group the consumer acknowledgments for the specified time in milliseconds - defaults to 100
+     * Group the consumer acknowledgments for the specified time in milliseconds
+     * - defaults to 100
      */
     public void setAckGroupTimeMillis(long ackGroupTimeMillis) {
         this.ackGroupTimeMillis = ackGroupTimeMillis;
     }
 
     /**
-      * Send timeout in milliseconds.
-      * Defaults to 30,000ms (30 seconds)
+     * Send timeout in milliseconds. Defaults to 30,000ms (30 seconds)
      */
     public void setSendTimeoutMs(int sendTimeoutMs) {
         this.sendTimeoutMs = sendTimeoutMs;
@@ -203,10 +207,10 @@ public class PulsarConfiguration {
     }
 
     /**
-     * Set whether the send and asyncSend operations should block when the outgoing message queue is full.
-     * If set to false, send operations will immediately fail with ProducerQueueIsFullError when there is no space left
-     * in the pending queue.
-     * Default is false.
+     * Set whether the send and asyncSend operations should block when the
+     * outgoing message queue is full. If set to false, send operations will
+     * immediately fail with ProducerQueueIsFullError when there is no space
+     * left in the pending queue. Default is false.
      */
     public void setBlockIfQueueFull(boolean blockIfQueueFull) {
         this.blockIfQueueFull = blockIfQueueFull;
@@ -217,8 +221,8 @@ public class PulsarConfiguration {
     }
 
     /**
-     * Set the max size of the queue holding the messages pending to receive an acknowledgment from the broker.
-     * Default is 1000.
+     * Set the max size of the queue holding the messages pending to receive an
+     * acknowledgment from the broker. Default is 1000.
      */
     public void setMaxPendingMessages(int maxPendingMessages) {
         this.maxPendingMessages = maxPendingMessages;
@@ -229,8 +233,8 @@ public class PulsarConfiguration {
     }
 
     /**
-     * Set the number of max pending messages across all the partitions.
-     * Default is 50000.
+     * Set the number of max pending messages across all the partitions. Default
+     * is 50000.
      */
     public void setMaxPendingMessagesAcrossPartitions(int maxPendingMessagesAcrossPartitions) {
         this.maxPendingMessagesAcrossPartitions = maxPendingMessagesAcrossPartitions;
@@ -241,11 +245,12 @@ public class PulsarConfiguration {
     }
 
     /**
-     * Set the time period within which the messages sent will be batched if batch messages are
-     * enabled. If set to a non zero value, messages will be queued until either:
+     * Set the time period within which the messages sent will be batched if
+     * batch messages are enabled. If set to a non zero value, messages will be
+     * queued until either:
      * <ul>
-     *  <li>this time interval expires</li>
-     *  <li>the max number of messages in a batch is reached
+     * <li>this time interval expires</li>
+     * <li>the max number of messages in a batch is reached
      * </ul>
      * Default is 1ms.
      */
@@ -258,8 +263,7 @@ public class PulsarConfiguration {
     }
 
     /**
-     * Set the maximum number of messages permitted in a batch.
-     * Default 1,000.
+     * Set the maximum number of messages permitted in a batch. Default 1,000.
      */
     public void setBatchingMaxMessages(int batchingMaxMessages) {
         this.batchingMaxMessages = batchingMaxMessages;
@@ -270,8 +274,8 @@ public class PulsarConfiguration {
     }
 
     /**
-     * Control whether automatic batching of messages is enabled for the producer.
-     * Default is true.
+     * Control whether automatic batching of messages is enabled for the
+     * producer. Default is true.
      */
     public void setBatchingEnabled(boolean batchingEnabled) {
         this.batchingEnabled = batchingEnabled;
@@ -282,9 +286,10 @@ public class PulsarConfiguration {
     }
 
     /**
-     * Set the baseline for the sequence ids for messages published by the producer.
-     * First message will be using (initialSequenceId  1) as its sequence id and subsequent messages will be assigned
-     * incremental sequence ids, if not otherwise specified.
+     * Set the baseline for the sequence ids for messages published by the
+     * producer. First message will be using (initialSequenceId 1) as its
+     * sequence id and subsequent messages will be assigned incremental sequence
+     * ids, if not otherwise specified.
      */
     public void setInitialSequenceId(long initialSequenceId) {
         this.initialSequenceId = initialSequenceId;
@@ -315,24 +320,23 @@ public class PulsarConfiguration {
     /**
      * Set the message routing mode for the producer.
      */
-	public MessageRoutingMode getMessageRoutingMode() {
-		return messageRoutingMode;
-	}
+    public MessageRoutingMode getMessageRoutingMode() {
+        return messageRoutingMode;
+    }
 
-	public void setMessageRoutingMode(MessageRoutingMode messageRoutingMode) {
-		this.messageRoutingMode = messageRoutingMode;
-	}
+    public void setMessageRoutingMode(MessageRoutingMode messageRoutingMode) {
+        this.messageRoutingMode = messageRoutingMode;
+    }
 
     /**
      * Set a custom Message Router.
      */
-	public MessageRouter getMessageRouter() {
-		return messageRouter;
-	}
-
-	public void setMessageRouter(MessageRouter messageRouter) {
-		this.messageRouter = messageRouter;
-	}
-    
+    public MessageRouter getMessageRouter() {
+        return messageRouter;
+    }
+
+    public void setMessageRouter(MessageRouter messageRouter) {
+        this.messageRouter = messageRouter;
+    }
 
 }
diff --git a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/PulsarPath.java b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/PulsarPath.java
index 1af739f..eb3b63e 100644
--- a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/PulsarPath.java
+++ b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/PulsarPath.java
@@ -58,4 +58,4 @@ public class PulsarPath {
     public boolean isAutoConfigurable() {
         return autoConfigurable;
     }
-}
\ No newline at end of file
+}
diff --git a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/consumers/CommonCreationStrategyImpl.java b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/consumers/CommonCreationStrategyImpl.java
index a96e369..de2ff9c 100644
--- a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/consumers/CommonCreationStrategyImpl.java
+++ b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/consumers/CommonCreationStrategyImpl.java
@@ -24,7 +24,7 @@ import org.apache.camel.component.pulsar.configuration.PulsarConfiguration;
 import org.apache.pulsar.client.api.ConsumerBuilder;
 
 public final class CommonCreationStrategyImpl {
-    
+
     private CommonCreationStrategyImpl() {
     }
 
@@ -32,8 +32,7 @@ public final class CommonCreationStrategyImpl {
         final PulsarConfiguration endpointConfiguration = pulsarEndpoint.getPulsarConfiguration();
 
         return pulsarEndpoint.getPulsarClient().newConsumer().topic(pulsarEndpoint.getUri()).subscriptionName(endpointConfiguration.getSubscriptionName())
-            .receiverQueueSize(endpointConfiguration.getConsumerQueueSize()).consumerName(name)
-            .ackTimeout(endpointConfiguration.getAckTimeoutMillis(), TimeUnit.MILLISECONDS)
+            .receiverQueueSize(endpointConfiguration.getConsumerQueueSize()).consumerName(name).ackTimeout(endpointConfiguration.getAckTimeoutMillis(), TimeUnit.MILLISECONDS)
             .acknowledgmentGroupTime(endpointConfiguration.getAckGroupTimeMillis(), TimeUnit.MILLISECONDS)
             .messageListener(new PulsarMessageListener(pulsarEndpoint, pulsarConsumer.getExceptionHandler(), pulsarConsumer.getProcessor()));
     }
diff --git a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/message/PulsarMessageUtils.java b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/message/PulsarMessageUtils.java
index 8332c2a..860337f 100644
--- a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/message/PulsarMessageUtils.java
+++ b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/message/PulsarMessageUtils.java
@@ -35,7 +35,7 @@ import static org.apache.camel.component.pulsar.utils.message.PulsarMessageHeade
 import static org.apache.camel.component.pulsar.utils.message.PulsarMessageHeaders.TOPIC_NAME;
 
 public final class PulsarMessageUtils {
-    
+
     private PulsarMessageUtils() {
     }
 
diff --git a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarConcurrentConsumerInTest.java b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarConcurrentConsumerInTest.java
index a4bb747..c859d30 100644
--- a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarConcurrentConsumerInTest.java
+++ b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarConcurrentConsumerInTest.java
@@ -45,7 +45,7 @@ public class PulsarConcurrentConsumerInTest extends PulsarTestSupport {
     private static final int NUMBER_OF_CONSUMERS = 5;
 
     @EndpointInject("pulsar:" + TOPIC_URI + "?numberOfConsumers=5&subscriptionType=Shared"
-                          + "&subscriptionName=camel-subscription&consumerQueueSize=1&consumerNamePrefix=camel-consumer-")
+                    + "&subscriptionName=camel-subscription&consumerQueueSize=1&consumerNamePrefix=camel-consumer-")
     private Endpoint from;
 
     @EndpointInject("mock:result")
@@ -72,7 +72,7 @@ public class PulsarConcurrentConsumerInTest extends PulsarTestSupport {
     @Override
     protected Registry createCamelRegistry() throws Exception {
         SimpleRegistry registry = new SimpleRegistry();
-        
+
         registerPulsarBeans(registry);
 
         return registry;
diff --git a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarConcurrentProducerInTest.java b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarConcurrentProducerInTest.java
index d6aac83..d3b4622 100644
--- a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarConcurrentProducerInTest.java
+++ b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarConcurrentProducerInTest.java
@@ -43,7 +43,7 @@ public class PulsarConcurrentProducerInTest extends PulsarTestSupport {
     private ProducerTemplate producerTemplate;
 
     @EndpointInject("pulsar:" + TOPIC_URI + "?numberOfConsumers=3&subscriptionType=Shared" + "&subscriptionName=camel-subscription&consumerQueueSize=5"
-                          + "&consumerNamePrefix=camel-consumer" + "&producerName=" + PRODUCER)
+                    + "&consumerNamePrefix=camel-consumer" + "&producerName=" + PRODUCER)
     private Endpoint from;
 
     @EndpointInject("mock:result")
@@ -64,7 +64,7 @@ public class PulsarConcurrentProducerInTest extends PulsarTestSupport {
     @Override
     protected Registry createCamelRegistry() throws Exception {
         SimpleRegistry registry = new SimpleRegistry();
-        
+
         registerPulsarBeans(registry);
 
         return registry;
diff --git a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarConsumerAcknowledgementTest.java b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarConsumerAcknowledgementTest.java
index 0ce2dcc..e3195b3 100644
--- a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarConsumerAcknowledgementTest.java
+++ b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarConsumerAcknowledgementTest.java
@@ -42,12 +42,8 @@ public class PulsarConsumerAcknowledgementTest extends PulsarTestSupport {
     private static final String TOPIC_URI = "persistent://public/default/camel-topic";
     private static final String PRODUCER = "camel-producer-1";
 
-    @EndpointInject(uri = "pulsar:" + TOPIC_URI
-            + "?numberOfConsumers=1&subscriptionType=Exclusive"
-            + "&subscriptionName=camel-subscription&consumerQueueSize=1&consumerName=camel-consumer"
-            + "&allowManualAcknowledgement=true"
-            + "&ackTimeoutMillis=1000"
-    )
+    @EndpointInject(uri = "pulsar:" + TOPIC_URI + "?numberOfConsumers=1&subscriptionType=Exclusive"
+                          + "&subscriptionName=camel-subscription&consumerQueueSize=1&consumerName=camel-consumer" + "&allowManualAcknowledgement=true" + "&ackTimeoutMillis=1000")
     private Endpoint from;
 
     @EndpointInject(uri = "mock:result")
@@ -58,10 +54,7 @@ public class PulsarConsumerAcknowledgementTest extends PulsarTestSupport {
     @Before
     public void setup() throws Exception {
         context.removeRoute("myRoute");
-        producer = givenPulsarClient().newProducer(Schema.STRING)
-                .producerName(PRODUCER)
-                .topic(TOPIC_URI)
-                .create();
+        producer = givenPulsarClient().newProducer(Schema.STRING).producerName(PRODUCER).topic(TOPIC_URI).create();
     }
 
     @Override
@@ -85,11 +78,7 @@ public class PulsarConsumerAcknowledgementTest extends PulsarTestSupport {
     }
 
     private PulsarClient givenPulsarClient() throws PulsarClientException {
-        return new ClientBuilderImpl()
-                .serviceUrl(getPulsarBrokerUrl())
-                .ioThreads(1)
-                .listenerThreads(1)
-                .build();
+        return new ClientBuilderImpl().serviceUrl(getPulsarBrokerUrl()).ioThreads(1).listenerThreads(1).build();
     }
 
     @Test
@@ -102,8 +91,7 @@ public class PulsarConsumerAcknowledgementTest extends PulsarTestSupport {
                 from(from).routeId("myRoute").to(to).process(exchange -> {
                     LOGGER.info("Processing message {}", exchange.getIn().getBody());
 
-                    PulsarMessageReceipt receipt = (PulsarMessageReceipt) exchange.getIn()
-                            .getHeader(PulsarMessageHeaders.MESSAGE_RECEIPT);
+                    PulsarMessageReceipt receipt = (PulsarMessageReceipt)exchange.getIn().getHeader(PulsarMessageHeaders.MESSAGE_RECEIPT);
                     receipt.acknowledge();
                 });
             }
@@ -124,8 +112,7 @@ public class PulsarConsumerAcknowledgementTest extends PulsarTestSupport {
                 from(from).routeId("myRoute").to(to).process(exchange -> {
                     LOGGER.info("Processing message {}", exchange.getIn().getBody());
 
-                    PulsarMessageReceipt receipt = (PulsarMessageReceipt) exchange.getIn()
-                            .getHeader(PulsarMessageHeaders.MESSAGE_RECEIPT);
+                    PulsarMessageReceipt receipt = (PulsarMessageReceipt)exchange.getIn().getHeader(PulsarMessageHeaders.MESSAGE_RECEIPT);
                     try {
                         CompletableFuture<Void> f = receipt.acknowledgeAsync();
                         f.get();
@@ -151,8 +138,7 @@ public class PulsarConsumerAcknowledgementTest extends PulsarTestSupport {
                 from(from).routeId("myRoute").to(to).process(exchange -> {
                     LOGGER.info("Processing message {}", exchange.getIn().getBody());
 
-                    PulsarMessageReceipt receipt = (PulsarMessageReceipt) exchange.getIn()
-                            .getHeader(PulsarMessageHeaders.MESSAGE_RECEIPT);
+                    PulsarMessageReceipt receipt = (PulsarMessageReceipt)exchange.getIn().getHeader(PulsarMessageHeaders.MESSAGE_RECEIPT);
                     // Ack the second message. The first will also be acked.
                     if (exchange.getIn().getBody().equals("Hello World Again!")) {
                         receipt.acknowledgeCumulative();
@@ -177,8 +163,7 @@ public class PulsarConsumerAcknowledgementTest extends PulsarTestSupport {
                 from(from).routeId("myRoute").to(to).process(exchange -> {
                     LOGGER.info("Processing message {}", exchange.getIn().getBody());
 
-                    PulsarMessageReceipt receipt = (PulsarMessageReceipt) exchange.getIn()
-                            .getHeader(PulsarMessageHeaders.MESSAGE_RECEIPT);
+                    PulsarMessageReceipt receipt = (PulsarMessageReceipt)exchange.getIn().getHeader(PulsarMessageHeaders.MESSAGE_RECEIPT);
                     // Ack the second message. The first will also be acked.
                     if (exchange.getIn().getBody().equals("Hello World Again!")) {
                         try {
diff --git a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarConsumerInTest.java b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarConsumerInTest.java
index 2590290..8ff9e91 100644
--- a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarConsumerInTest.java
+++ b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarConsumerInTest.java
@@ -43,10 +43,8 @@ public class PulsarConsumerInTest extends PulsarTestSupport {
     private static final String TOPIC_URI = "persistent://public/default/camel-topic";
     private static final String PRODUCER = "camel-producer-1";
 
-    @EndpointInject("pulsar:" + TOPIC_URI
-        + "?numberOfConsumers=1&subscriptionType=Exclusive"
-        + "&subscriptionName=camel-subscription&consumerQueueSize=1&consumerName=camel-consumer"
-    )
+    @EndpointInject("pulsar:" + TOPIC_URI + "?numberOfConsumers=1&subscriptionType=Exclusive"
+                    + "&subscriptionName=camel-subscription&consumerQueueSize=1&consumerName=camel-consumer")
     private Endpoint from;
 
     @EndpointInject("mock:result")
@@ -73,7 +71,7 @@ public class PulsarConsumerInTest extends PulsarTestSupport {
     @Override
     protected Registry createCamelRegistry() throws Exception {
         SimpleRegistry registry = new SimpleRegistry();
-        
+
         registerPulsarBeans(registry);
 
         return registry;
@@ -92,21 +90,14 @@ public class PulsarConsumerInTest extends PulsarTestSupport {
     }
 
     private PulsarClient givenPulsarClient() throws PulsarClientException {
-        return new ClientBuilderImpl()
-            .serviceUrl(getPulsarBrokerUrl())
-            .ioThreads(1)
-            .listenerThreads(1)
-            .build();
+        return new ClientBuilderImpl().serviceUrl(getPulsarBrokerUrl()).ioThreads(1).listenerThreads(1).build();
     }
 
     @Test
     public void testAMessageToClusterIsConsumed() throws Exception {
         to.expectedMessageCount(1);
 
-        Producer<String> producer = givenPulsarClient().newProducer(Schema.STRING)
-            .producerName(PRODUCER)
-            .topic(TOPIC_URI)
-            .create();
+        Producer<String> producer = givenPulsarClient().newProducer(Schema.STRING).producerName(PRODUCER).topic(TOPIC_URI).create();
 
         producer.send("Hello World!");
 
diff --git a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarConsumerNoAcknowledgementTest.java b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarConsumerNoAcknowledgementTest.java
index 1cb5527..f57b4a5 100644
--- a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarConsumerNoAcknowledgementTest.java
+++ b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarConsumerNoAcknowledgementTest.java
@@ -36,11 +36,8 @@ public class PulsarConsumerNoAcknowledgementTest extends PulsarTestSupport {
     private static final String TOPIC_URI = "persistent://public/default/camel-topic";
     private static final String PRODUCER = "camel-producer-1";
 
-    @EndpointInject(uri = "pulsar:" + TOPIC_URI
-            + "?numberOfConsumers=1&subscriptionType=Exclusive"
-            + "&subscriptionName=camel-subscription&consumerQueueSize=1&consumerName=camel-consumer"
-            + "&ackTimeoutMillis=1000"
-    )
+    @EndpointInject(uri = "pulsar:" + TOPIC_URI + "?numberOfConsumers=1&subscriptionType=Exclusive"
+                          + "&subscriptionName=camel-subscription&consumerQueueSize=1&consumerName=camel-consumer" + "&ackTimeoutMillis=1000")
     private Endpoint from;
 
     @EndpointInject(uri = "mock:result")
@@ -74,26 +71,21 @@ public class PulsarConsumerNoAcknowledgementTest extends PulsarTestSupport {
         PulsarComponent comp = new PulsarComponent(context);
         comp.setAutoConfiguration(autoConfiguration);
         comp.setPulsarClient(pulsarClient);
-        comp.setAllowManualAcknowledgement(true); // Set to true here instead of the endpoint query parameter.
+        comp.setAllowManualAcknowledgement(true); // Set to true here instead of
+                                                  // the endpoint query
+                                                  // parameter.
         jndi.bind("pulsar", comp);
     }
 
     private PulsarClient givenPulsarClient() throws PulsarClientException {
-        return new ClientBuilderImpl()
-                .serviceUrl(getPulsarBrokerUrl())
-                .ioThreads(1)
-                .listenerThreads(1)
-                .build();
+        return new ClientBuilderImpl().serviceUrl(getPulsarBrokerUrl()).ioThreads(1).listenerThreads(1).build();
     }
 
     @Test
     public void testAMessageIsConsumedMultipleTimes() throws Exception {
         to.expectedMinimumMessageCount(2);
 
-        Producer<String> producer = givenPulsarClient().newProducer(Schema.STRING)
-                .producerName(PRODUCER)
-                .topic(TOPIC_URI)
-                .create();
+        Producer<String> producer = givenPulsarClient().newProducer(Schema.STRING).producerName(PRODUCER).topic(TOPIC_URI).create();
 
         producer.send("Hello World!");
 
diff --git a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarCustomMessageReceiptTest.java b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarCustomMessageReceiptTest.java
index 24a9132..eea202e 100644
--- a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarCustomMessageReceiptTest.java
+++ b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarCustomMessageReceiptTest.java
@@ -53,12 +53,8 @@ public class PulsarCustomMessageReceiptTest extends PulsarTestSupport {
 
     public PulsarMessageReceipt mockPulsarMessageReceipt = mock(PulsarMessageReceipt.class);
 
-    @EndpointInject(uri = "pulsar:" + TOPIC_URI
-            + "?numberOfConsumers=1&subscriptionType=Exclusive"
-            + "&subscriptionName=camel-subscription&consumerQueueSize=1&consumerName=camel-consumer"
-            + "&allowManualAcknowledgement=true"
-            + "&ackTimeoutMillis=1000"
-    )
+    @EndpointInject(uri = "pulsar:" + TOPIC_URI + "?numberOfConsumers=1&subscriptionType=Exclusive"
+                          + "&subscriptionName=camel-subscription&consumerQueueSize=1&consumerName=camel-consumer" + "&allowManualAcknowledgement=true" + "&ackTimeoutMillis=1000")
     private Endpoint from;
 
     @EndpointInject(uri = "mock:result")
@@ -68,10 +64,7 @@ public class PulsarCustomMessageReceiptTest extends PulsarTestSupport {
 
     @Before
     public void setup() throws Exception {
-        producer = givenPulsarClient().newProducer(Schema.STRING)
-                .producerName(PRODUCER)
-                .topic(TOPIC_URI)
-                .create();
+        producer = givenPulsarClient().newProducer(Schema.STRING).producerName(PRODUCER).topic(TOPIC_URI).create();
     }
 
     @Override
@@ -97,11 +90,7 @@ public class PulsarCustomMessageReceiptTest extends PulsarTestSupport {
     }
 
     private PulsarClient givenPulsarClient() throws PulsarClientException {
-        return new ClientBuilderImpl()
-                .serviceUrl(getPulsarBrokerUrl())
-                .ioThreads(1)
-                .listenerThreads(1)
-                .build();
+        return new ClientBuilderImpl().serviceUrl(getPulsarBrokerUrl()).ioThreads(1).listenerThreads(1).build();
     }
 
     @Test
@@ -116,8 +105,7 @@ public class PulsarCustomMessageReceiptTest extends PulsarTestSupport {
                 from(from).routeId("myRoute").to(to).process(exchange -> {
                     LOGGER.info("Processing message {}", exchange.getIn().getBody());
 
-                    PulsarMessageReceipt receipt = (PulsarMessageReceipt) exchange.getIn()
-                            .getHeader(PulsarMessageHeaders.MESSAGE_RECEIPT);
+                    PulsarMessageReceipt receipt = (PulsarMessageReceipt)exchange.getIn().getHeader(PulsarMessageHeaders.MESSAGE_RECEIPT);
                     receipt.acknowledge();
                 });
             }
@@ -127,10 +115,11 @@ public class PulsarCustomMessageReceiptTest extends PulsarTestSupport {
 
         MockEndpoint.assertIsSatisfied(10, TimeUnit.SECONDS, to);
 
-        // The mock does not actually acknowledge using the pulsar consumer, so the message will be re-consumed
+        // The mock does not actually acknowledge using the pulsar consumer, so
+        // the message will be re-consumed
         // after the ackTimeout.
         verify(mockPulsarMessageReceipt, atLeast(2)).acknowledge();
         verifyNoMoreInteractions(mockPulsarMessageReceipt);
     }
-    
+
 }
diff --git a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarProducerInTest.java b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarProducerInTest.java
index c66ca1c..52b274e 100644
--- a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarProducerInTest.java
+++ b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarProducerInTest.java
@@ -40,12 +40,8 @@ public class PulsarProducerInTest extends PulsarTestSupport {
     @Produce("direct:start")
     private ProducerTemplate producerTemplate;
 
-    @EndpointInject("pulsar:" + TOPIC_URI
-        + "?numberOfConsumers=1&subscriptionType=Exclusive"
-        + "&subscriptionName=camel-subscription&consumerQueueSize=1"
-        + "&consumerName=camel-consumer"
-        + "&producerName=" + PRODUCER
-    )
+    @EndpointInject("pulsar:" + TOPIC_URI + "?numberOfConsumers=1&subscriptionType=Exclusive" + "&subscriptionName=camel-subscription&consumerQueueSize=1"
+                    + "&consumerName=camel-consumer" + "&producerName=" + PRODUCER)
     private Endpoint from;
 
     @EndpointInject("mock:result")
@@ -66,7 +62,7 @@ public class PulsarProducerInTest extends PulsarTestSupport {
     @Override
     protected Registry createCamelRegistry() throws Exception {
         SimpleRegistry registry = new SimpleRegistry();
-        
+
         registerPulsarBeans(registry);
 
         return registry;
@@ -85,11 +81,7 @@ public class PulsarProducerInTest extends PulsarTestSupport {
     }
 
     private PulsarClient givenPulsarClient() throws PulsarClientException {
-        return new ClientBuilderImpl()
-            .serviceUrl(getPulsarBrokerUrl())
-            .ioThreads(1)
-            .listenerThreads(1)
-            .build();
+        return new ClientBuilderImpl().serviceUrl(getPulsarBrokerUrl()).ioThreads(1).listenerThreads(1).build();
     }
 
     @Test
diff --git a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarTestSupport.java b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarTestSupport.java
index c49f46a..03124db 100644
--- a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarTestSupport.java
+++ b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarTestSupport.java
@@ -34,9 +34,7 @@ public class PulsarTestSupport extends ContainerAwareTestSupport {
     }
 
     public static GenericContainer pulsarContainer() {
-        return new GenericContainer(CONTAINER_IMAGE)
-            .withNetworkAliases(CONTAINER_NAME)
-            .withExposedPorts(BROKER_PORT, BROKER_HTTP_PORT)
+        return new GenericContainer(CONTAINER_IMAGE).withNetworkAliases(CONTAINER_NAME).withExposedPorts(BROKER_PORT, BROKER_HTTP_PORT)
             .withCommand("/pulsar/bin/pulsar", "standalone", "--no-functions-worker", "-nss")
             .waitingFor(Wait.forHttp(WAIT_FOR_ENDPOINT).forStatusCode(200).forPort(BROKER_HTTP_PORT));
     }
diff --git a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/utils/PulsarUtilsTest.java b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/utils/PulsarUtilsTest.java
index b93ba10..b4c9dad 100644
--- a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/utils/PulsarUtilsTest.java
+++ b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/utils/PulsarUtilsTest.java
@@ -66,8 +66,8 @@ public class PulsarUtilsTest {
         doThrow(new PulsarClientException("A Pulsar Client exception occurred")).when(consumer).close();
 
         consumer.close();
-        
+
         verify(consumer).unsubscribe();
         verify(consumer).close();
     }
-}
\ No newline at end of file
+}


[camel] 05/05: Regen docs for Camel-Pulsar on the website side

Posted by ac...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 61255008a90f7a62a34a0f9dbe97ec9c97f952fb
Author: Andrea Cosentino <an...@gmail.com>
AuthorDate: Fri Sep 20 12:42:59 2019 +0200

    Regen docs for Camel-Pulsar on the website side
---
 docs/components/modules/ROOT/pages/pulsar-component.adoc | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)

diff --git a/docs/components/modules/ROOT/pages/pulsar-component.adoc b/docs/components/modules/ROOT/pages/pulsar-component.adoc
index c818ece..6a61096 100644
--- a/docs/components/modules/ROOT/pages/pulsar-component.adoc
+++ b/docs/components/modules/ROOT/pages/pulsar-component.adoc
@@ -70,7 +70,7 @@ with the following path and query parameters:
 |===
 
 
-=== Query Parameters (25 parameters):
+=== Query Parameters (27 parameters):
 
 
 [width="100%",cols="2,5,^1,2",options="header"]
@@ -97,6 +97,8 @@ with the following path and query parameters:
 | *lazyStartProducer* (producer) | Whether the producer should be started lazy (on the first message). By starting lazy you can use this to allow CamelContext and routes to startup in situations where a producer may otherwise fail during starting and cause the route to fail being started. By deferring this startup to be lazy then the startup failure can be handled during routing messages via Camel's routing error handlers. Beware that when the first message is processed then creating and [...]
 | *maxPendingMessages* (producer) | Set the max size of the queue holding the messages pending to receive an acknowledgment from the broker. Default is 1000. | 1000 | int
 | *maxPendingMessagesAcross Partitions* (producer) | Set the number of max pending messages across all the partitions. Default is 50000. | 50000 | int
+| *messageRouter* (producer) | Set a custom Message Router. |  | MessageRouter
+| *messageRoutingMode* (producer) | Set the message routing mode for the producer. | RoundRobinPartition | MessageRoutingMode
 | *producerName* (producer) | Name of the producer | default-producer | String
 | *sendTimeoutMs* (producer) | Send timeout in milliseconds. Defaults to 30,000ms (30 seconds) | 30000 | int
 | *basicPropertyBinding* (advanced) | Whether the endpoint should use basic property binding (Camel 2.x) or the newer property binding with additional capabilities | false | boolean


[camel] 01/05: CAMEL-14001 - Camel-Pulsar: Add MessageRoutingMode and MessageRouter option to producer, messageRouting mode option

Posted by ac...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit f8a5b7b5fb728649164280c102b63ccbd5f17539
Author: Andrea Cosentino <an...@gmail.com>
AuthorDate: Fri Sep 20 12:31:27 2019 +0200

    CAMEL-14001 - Camel-Pulsar: Add MessageRoutingMode and MessageRouter option to producer, messageRouting mode option
---
 .../src/main/docs/pulsar-component.adoc            |  3 +-
 .../camel/component/pulsar/PulsarProducer.java     |  3 +-
 .../pulsar/configuration/PulsarConfiguration.java  | 16 ++++++++++
 .../endpoint/dsl/PulsarEndpointBuilderFactory.java | 36 ++++++++++++++++++++++
 4 files changed, 56 insertions(+), 2 deletions(-)

diff --git a/components/camel-pulsar/src/main/docs/pulsar-component.adoc b/components/camel-pulsar/src/main/docs/pulsar-component.adoc
index 605bff6..fa40b34 100644
--- a/components/camel-pulsar/src/main/docs/pulsar-component.adoc
+++ b/components/camel-pulsar/src/main/docs/pulsar-component.adoc
@@ -69,7 +69,7 @@ with the following path and query parameters:
 |===
 
 
-=== Query Parameters (25 parameters):
+=== Query Parameters (26 parameters):
 
 
 [width="100%",cols="2,5,^1,2",options="header"]
@@ -96,6 +96,7 @@ with the following path and query parameters:
 | *lazyStartProducer* (producer) | Whether the producer should be started lazy (on the first message). By starting lazy you can use this to allow CamelContext and routes to startup in situations where a producer may otherwise fail during starting and cause the route to fail being started. By deferring this startup to be lazy then the startup failure can be handled during routing messages via Camel's routing error handlers. Beware that when the first message is processed then creating and [...]
 | *maxPendingMessages* (producer) | Set the max size of the queue holding the messages pending to receive an acknowledgment from the broker. Default is 1000. | 1000 | int
 | *maxPendingMessagesAcross Partitions* (producer) | Set the number of max pending messages across all the partitions. Default is 50000. | 50000 | int
+| *messageRoutingMode* (producer) | Set the message routing mode for the producer. | RoundRobinPartition | MessageRoutingMode
 | *producerName* (producer) | Name of the producer | default-producer | String
 | *sendTimeoutMs* (producer) | Send timeout in milliseconds. Defaults to 30,000ms (30 seconds) | 30000 | int
 | *basicPropertyBinding* (advanced) | Whether the endpoint should use basic property binding (Camel 2.x) or the newer property binding with additional capabilities | false | boolean
diff --git a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarProducer.java b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarProducer.java
index 2b4eaf3..f4a5646 100644
--- a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarProducer.java
+++ b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarProducer.java
@@ -73,7 +73,8 @@ public class PulsarProducer extends DefaultProducer {
                     .batchingMaxMessages(configuration.getMaxPendingMessages())
                     .enableBatching(configuration.isBatchingEnabled())
                     .initialSequenceId(configuration.getInitialSequenceId())
-                    .compressionType(configuration.getCompressionType());
+                    .compressionType(configuration.getCompressionType())
+                    .messageRoutingMode(configuration.getMessageRoutingMode());
             producer = producerBuilder.create();
         }
     }
diff --git a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/configuration/PulsarConfiguration.java b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/configuration/PulsarConfiguration.java
index 223c124..4366166 100644
--- a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/configuration/PulsarConfiguration.java
+++ b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/configuration/PulsarConfiguration.java
@@ -23,6 +23,7 @@ import org.apache.camel.component.pulsar.utils.consumers.SubscriptionType;
 import org.apache.camel.spi.UriParam;
 import org.apache.camel.spi.UriParams;
 import org.apache.pulsar.client.api.CompressionType;
+import org.apache.pulsar.client.api.MessageRoutingMode;
 
 import static org.apache.camel.component.pulsar.utils.consumers.SubscriptionType.EXCLUSIVE;
 
@@ -69,6 +70,8 @@ public class PulsarConfiguration {
     private long initialSequenceId = -1;
     @UriParam(label = "producer", description = "Compression type to use", defaultValue = "NONE")
     private CompressionType compressionType = CompressionType.NONE;
+    @UriParam(label = "producer", description = "MessageRoutingMode", defaultValue = "RoundRobinPartition")
+    private MessageRoutingMode messageRoutingMode = MessageRoutingMode.RoundRobinPartition;
 
     public String getSubscriptionName() {
         return subscriptionName;
@@ -305,4 +308,17 @@ public class PulsarConfiguration {
     public CompressionType getCompressionType() {
         return compressionType;
     }
+
+    /**
+     * Set the message routing mode for the producer.
+     */
+	public MessageRoutingMode getMessageRoutingMode() {
+		return messageRoutingMode;
+	}
+
+	public void setMessageRoutingMode(MessageRoutingMode messageRoutingMode) {
+		this.messageRoutingMode = messageRoutingMode;
+	}
+    
+
 }
diff --git a/core/camel-endpointdsl/src/main/java/org/apache/camel/builder/endpoint/dsl/PulsarEndpointBuilderFactory.java b/core/camel-endpointdsl/src/main/java/org/apache/camel/builder/endpoint/dsl/PulsarEndpointBuilderFactory.java
index 1e770ca..943e2f1 100644
--- a/core/camel-endpointdsl/src/main/java/org/apache/camel/builder/endpoint/dsl/PulsarEndpointBuilderFactory.java
+++ b/core/camel-endpointdsl/src/main/java/org/apache/camel/builder/endpoint/dsl/PulsarEndpointBuilderFactory.java
@@ -667,6 +667,32 @@ public interface PulsarEndpointBuilderFactory {
             return this;
         }
         /**
+         * Set the message routing mode for the producer.
+         * 
+         * The option is a:
+         * <code>org.apache.pulsar.client.api.MessageRoutingMode</code> type.
+         * 
+         * Group: producer
+         */
+        default PulsarEndpointProducerBuilder messageRoutingMode(
+                MessageRoutingMode messageRoutingMode) {
+            doSetProperty("messageRoutingMode", messageRoutingMode);
+            return this;
+        }
+        /**
+         * Set the message routing mode for the producer.
+         * 
+         * The option will be converted to a
+         * <code>org.apache.pulsar.client.api.MessageRoutingMode</code> type.
+         * 
+         * Group: producer
+         */
+        default PulsarEndpointProducerBuilder messageRoutingMode(
+                String messageRoutingMode) {
+            doSetProperty("messageRoutingMode", messageRoutingMode);
+            return this;
+        }
+        /**
          * Name of the producer.
          * 
          * The option is a: <code>java.lang.String</code> type.
@@ -857,6 +883,16 @@ public interface PulsarEndpointBuilderFactory {
         ZSTD,
         SNAPPY;
     }
+
+    /**
+     * Proxy enum for
+     * <code>org.apache.pulsar.client.api.MessageRoutingMode</code> enum.
+     */
+    enum MessageRoutingMode {
+        SinglePartition,
+        RoundRobinPartition,
+        CustomPartition;
+    }
     /**
      * Apache Pulsar (camel-pulsar)
      * Camel Apache Pulsar Component


[camel] 02/05: CAMEL-14001 - Camel-Pulsar: Add MessageRoutingMode and MessageRouter option to producer, messageRouter option

Posted by ac...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 56cb7e3f0496cd757726cadbdb3065ab9729daea
Author: Andrea Cosentino <an...@gmail.com>
AuthorDate: Fri Sep 20 12:37:37 2019 +0200

    CAMEL-14001 - Camel-Pulsar: Add MessageRoutingMode and MessageRouter option to producer, messageRouter option
---
 .../src/main/docs/pulsar-component.adoc            |  3 ++-
 .../camel/component/pulsar/PulsarProducer.java     |  9 ++++++--
 .../pulsar/configuration/PulsarConfiguration.java  | 14 +++++++++++++
 .../endpoint/dsl/PulsarEndpointBuilderFactory.java | 24 ++++++++++++++++++++++
 4 files changed, 47 insertions(+), 3 deletions(-)

diff --git a/components/camel-pulsar/src/main/docs/pulsar-component.adoc b/components/camel-pulsar/src/main/docs/pulsar-component.adoc
index fa40b34..3e19465 100644
--- a/components/camel-pulsar/src/main/docs/pulsar-component.adoc
+++ b/components/camel-pulsar/src/main/docs/pulsar-component.adoc
@@ -69,7 +69,7 @@ with the following path and query parameters:
 |===
 
 
-=== Query Parameters (26 parameters):
+=== Query Parameters (27 parameters):
 
 
 [width="100%",cols="2,5,^1,2",options="header"]
@@ -96,6 +96,7 @@ with the following path and query parameters:
 | *lazyStartProducer* (producer) | Whether the producer should be started lazy (on the first message). By starting lazy you can use this to allow CamelContext and routes to startup in situations where a producer may otherwise fail during starting and cause the route to fail being started. By deferring this startup to be lazy then the startup failure can be handled during routing messages via Camel's routing error handlers. Beware that when the first message is processed then creating and [...]
 | *maxPendingMessages* (producer) | Set the max size of the queue holding the messages pending to receive an acknowledgment from the broker. Default is 1000. | 1000 | int
 | *maxPendingMessagesAcross Partitions* (producer) | Set the number of max pending messages across all the partitions. Default is 50000. | 50000 | int
+| *messageRouter* (producer) | Set a custom Message Router. |  | MessageRouter
 | *messageRoutingMode* (producer) | Set the message routing mode for the producer. | RoundRobinPartition | MessageRoutingMode
 | *producerName* (producer) | Name of the producer | default-producer | String
 | *sendTimeoutMs* (producer) | Send timeout in milliseconds. Defaults to 30,000ms (30 seconds) | 30000 | int
diff --git a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarProducer.java b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarProducer.java
index f4a5646..d7c8b0a 100644
--- a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarProducer.java
+++ b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarProducer.java
@@ -25,6 +25,7 @@ import org.apache.camel.TypeConversionException;
 import org.apache.camel.component.pulsar.configuration.PulsarConfiguration;
 import org.apache.camel.component.pulsar.utils.message.PulsarMessageUtils;
 import org.apache.camel.support.DefaultProducer;
+import org.apache.camel.util.ObjectHelper;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.ProducerBuilder;
 
@@ -73,8 +74,12 @@ public class PulsarProducer extends DefaultProducer {
                     .batchingMaxMessages(configuration.getMaxPendingMessages())
                     .enableBatching(configuration.isBatchingEnabled())
                     .initialSequenceId(configuration.getInitialSequenceId())
-                    .compressionType(configuration.getCompressionType())
-                    .messageRoutingMode(configuration.getMessageRoutingMode());
+                    .compressionType(configuration.getCompressionType());
+            if (ObjectHelper.isNotEmpty(configuration.getMessageRouter())) {
+                producerBuilder.messageRouter(configuration.getMessageRouter());
+            } else {
+            	producerBuilder.messageRoutingMode(configuration.getMessageRoutingMode());
+            }
             producer = producerBuilder.create();
         }
     }
diff --git a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/configuration/PulsarConfiguration.java b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/configuration/PulsarConfiguration.java
index 4366166..ea34312 100644
--- a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/configuration/PulsarConfiguration.java
+++ b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/configuration/PulsarConfiguration.java
@@ -23,6 +23,7 @@ import org.apache.camel.component.pulsar.utils.consumers.SubscriptionType;
 import org.apache.camel.spi.UriParam;
 import org.apache.camel.spi.UriParams;
 import org.apache.pulsar.client.api.CompressionType;
+import org.apache.pulsar.client.api.MessageRouter;
 import org.apache.pulsar.client.api.MessageRoutingMode;
 
 import static org.apache.camel.component.pulsar.utils.consumers.SubscriptionType.EXCLUSIVE;
@@ -72,6 +73,8 @@ public class PulsarConfiguration {
     private CompressionType compressionType = CompressionType.NONE;
     @UriParam(label = "producer", description = "MessageRoutingMode", defaultValue = "RoundRobinPartition")
     private MessageRoutingMode messageRoutingMode = MessageRoutingMode.RoundRobinPartition;
+    @UriParam(label = "producer", description = "Custom Message Router")
+    private MessageRouter messageRouter;
 
     public String getSubscriptionName() {
         return subscriptionName;
@@ -319,6 +322,17 @@ public class PulsarConfiguration {
 	public void setMessageRoutingMode(MessageRoutingMode messageRoutingMode) {
 		this.messageRoutingMode = messageRoutingMode;
 	}
+
+    /**
+     * Set a custom Message Router.
+     */
+	public MessageRouter getMessageRouter() {
+		return messageRouter;
+	}
+
+	public void setMessageRouter(MessageRouter messageRouter) {
+		this.messageRouter = messageRouter;
+	}
     
 
 }
diff --git a/core/camel-endpointdsl/src/main/java/org/apache/camel/builder/endpoint/dsl/PulsarEndpointBuilderFactory.java b/core/camel-endpointdsl/src/main/java/org/apache/camel/builder/endpoint/dsl/PulsarEndpointBuilderFactory.java
index 943e2f1..ace4f65 100644
--- a/core/camel-endpointdsl/src/main/java/org/apache/camel/builder/endpoint/dsl/PulsarEndpointBuilderFactory.java
+++ b/core/camel-endpointdsl/src/main/java/org/apache/camel/builder/endpoint/dsl/PulsarEndpointBuilderFactory.java
@@ -667,6 +667,30 @@ public interface PulsarEndpointBuilderFactory {
             return this;
         }
         /**
+         * Set a custom Message Router.
+         * 
+         * The option is a:
+         * <code>org.apache.pulsar.client.api.MessageRouter</code> type.
+         * 
+         * Group: producer
+         */
+        default PulsarEndpointProducerBuilder messageRouter(Object messageRouter) {
+            doSetProperty("messageRouter", messageRouter);
+            return this;
+        }
+        /**
+         * Set a custom Message Router.
+         * 
+         * The option will be converted to a
+         * <code>org.apache.pulsar.client.api.MessageRouter</code> type.
+         * 
+         * Group: producer
+         */
+        default PulsarEndpointProducerBuilder messageRouter(String messageRouter) {
+            doSetProperty("messageRouter", messageRouter);
+            return this;
+        }
+        /**
          * Set the message routing mode for the producer.
          * 
          * The option is a:


[camel] 04/05: Camel-Pulsar: Regen

Posted by ac...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit f6d1cb164364c864f8fc6a64974e186cf07600fa
Author: Andrea Cosentino <an...@gmail.com>
AuthorDate: Fri Sep 20 12:39:27 2019 +0200

    Camel-Pulsar: Regen
---
 .../camel/component/pulsar/configuration/PulsarConfiguration.java     | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/configuration/PulsarConfiguration.java b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/configuration/PulsarConfiguration.java
index b60bfed..50b9778 100644
--- a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/configuration/PulsarConfiguration.java
+++ b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/configuration/PulsarConfiguration.java
@@ -70,9 +70,9 @@ public class PulsarConfiguration {
     private long initialSequenceId = -1;
     @UriParam(label = "producer", description = "Compression type to use", defaultValue = "NONE")
     private CompressionType compressionType = CompressionType.NONE;
-    @UriParam(label = "producer", description = "MessageRoutingMode", defaultValue = "RoundRobinPartition")
+    @UriParam(label = "producer", description = "Message Routing Mode to use", defaultValue = "RoundRobinPartition")
     private MessageRoutingMode messageRoutingMode = MessageRoutingMode.RoundRobinPartition;
-    @UriParam(label = "producer", description = "Custom Message Router")
+    @UriParam(label = "producer", description = "Custom Message Router to use")
     private MessageRouter messageRouter;
 
     public String getSubscriptionName() {