You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2019/09/20 10:44:41 UTC

[camel] 03/05: Camel-Pulsar: Fixed CS

This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 10de4bab2e76046b9535128bf38bf67b80c3f3f4
Author: Andrea Cosentino <an...@gmail.com>
AuthorDate: Fri Sep 20 12:38:40 2019 +0200

    Camel-Pulsar: Fixed CS
---
 .../camel/component/pulsar/PulsarComponent.java    | 12 ++-
 .../camel/component/pulsar/PulsarEndpoint.java     | 14 ++--
 .../component/pulsar/PulsarMessageListener.java    |  6 +-
 .../component/pulsar/PulsarMessageReceipt.java     | 17 ++--
 .../pulsar/PulsarMessageReceiptFactory.java        |  9 ++-
 .../camel/component/pulsar/PulsarProducer.java     | 24 ++----
 .../pulsar/configuration/PulsarConfiguration.java  | 92 +++++++++++-----------
 .../camel/component/pulsar/utils/PulsarPath.java   |  2 +-
 .../consumers/CommonCreationStrategyImpl.java      |  5 +-
 .../pulsar/utils/message/PulsarMessageUtils.java   |  2 +-
 .../pulsar/PulsarConcurrentConsumerInTest.java     |  4 +-
 .../pulsar/PulsarConcurrentProducerInTest.java     |  4 +-
 .../pulsar/PulsarConsumerAcknowledgementTest.java  | 31 ++------
 .../component/pulsar/PulsarConsumerInTest.java     | 19 ++---
 .../PulsarConsumerNoAcknowledgementTest.java       | 22 ++----
 .../pulsar/PulsarCustomMessageReceiptTest.java     | 27 ++-----
 .../component/pulsar/PulsarProducerInTest.java     | 16 +---
 .../camel/component/pulsar/PulsarTestSupport.java  |  4 +-
 .../component/pulsar/utils/PulsarUtilsTest.java    |  4 +-
 19 files changed, 134 insertions(+), 180 deletions(-)

diff --git a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarComponent.java b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarComponent.java
index b43ab1d..08e6f65 100644
--- a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarComponent.java
+++ b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarComponent.java
@@ -108,9 +108,12 @@ public class PulsarComponent extends DefaultComponent {
     /**
      * Whether to allow manual message acknowledgements.
      * <p/>
-     * If this option is enabled, then messages are not immediately acknowledged after being consumed.
-     * Instead, an instance of {@link PulsarMessageReceipt} is stored as a header on the {@link org.apache.camel.Exchange}.
-     * Messages can then be acknowledged using {@link PulsarMessageReceipt} at any time before the ackTimeout occurs.
+     * If this option is enabled, then messages are not immediately acknowledged
+     * after being consumed. Instead, an instance of
+     * {@link PulsarMessageReceipt} is stored as a header on the
+     * {@link org.apache.camel.Exchange}. Messages can then be acknowledged
+     * using {@link PulsarMessageReceipt} at any time before the ackTimeout
+     * occurs.
      */
     public void setAllowManualAcknowledgement(boolean allowManualAcknowledgement) {
         this.allowManualAcknowledgement = allowManualAcknowledgement;
@@ -121,7 +124,8 @@ public class PulsarComponent extends DefaultComponent {
     }
 
     /**
-     * Provide a factory to create an alternate implementation of {@link PulsarMessageReceipt}.
+     * Provide a factory to create an alternate implementation of
+     * {@link PulsarMessageReceipt}.
      */
     public void setPulsarMessageReceiptFactory(PulsarMessageReceiptFactory pulsarMessageReceiptFactory) {
         this.pulsarMessageReceiptFactory = pulsarMessageReceiptFactory;
diff --git a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarEndpoint.java b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarEndpoint.java
index 0330677..5ab8bbe 100644
--- a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarEndpoint.java
+++ b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarEndpoint.java
@@ -34,13 +34,17 @@ public class PulsarEndpoint extends DefaultEndpoint {
     private PulsarClient pulsarClient;
     private String uri;
 
-    @UriPath(enums = "persistent,non-persistent") @Metadata(required = true)
+    @UriPath(enums = "persistent,non-persistent")
+    @Metadata(required = true)
     private String persistence;
-    @UriPath @Metadata(required = true)
+    @UriPath
+    @Metadata(required = true)
     private String tenant;
-    @UriPath @Metadata(required = true)
+    @UriPath
+    @Metadata(required = true)
     private String namespace;
-    @UriPath @Metadata(required = true)
+    @UriPath
+    @Metadata(required = true)
     private String topic;
     @UriParam
     private PulsarConfiguration pulsarConfiguration;
@@ -140,6 +144,6 @@ public class PulsarEndpoint extends DefaultEndpoint {
 
     @Override
     public PulsarComponent getComponent() {
-        return (PulsarComponent) super.getComponent();
+        return (PulsarComponent)super.getComponent();
     }
 }
diff --git a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarMessageListener.java b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarMessageListener.java
index 55249fc..11e87fb 100644
--- a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarMessageListener.java
+++ b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarMessageListener.java
@@ -47,8 +47,7 @@ public class PulsarMessageListener implements MessageListener<byte[]> {
 
         try {
             if (endpoint.getPulsarConfiguration().isAllowManualAcknowledgement()) {
-                exchange.getIn().setHeader(PulsarMessageHeaders.MESSAGE_RECEIPT,
-                        endpoint.getComponent().getPulsarMessageReceiptFactory().newInstance(exchange, message, consumer));
+                exchange.getIn().setHeader(PulsarMessageHeaders.MESSAGE_RECEIPT, endpoint.getComponent().getPulsarMessageReceiptFactory().newInstance(exchange, message, consumer));
                 processor.process(exchange);
             } else {
                 processor.process(exchange);
@@ -60,8 +59,7 @@ public class PulsarMessageListener implements MessageListener<byte[]> {
     }
 
     private void handleProcessorException(final Exchange exchange, final Exception exception) {
-        final Exchange exchangeWithException = PulsarMessageUtils
-            .updateExchangeWithException(exception, exchange);
+        final Exchange exchangeWithException = PulsarMessageUtils.updateExchangeWithException(exception, exchange);
 
         exceptionHandler.handleException("An error occurred", exchangeWithException, exception);
     }
diff --git a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarMessageReceipt.java b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarMessageReceipt.java
index 7cdfd90..bb2f849 100644
--- a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarMessageReceipt.java
+++ b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarMessageReceipt.java
@@ -25,8 +25,10 @@ import org.apache.pulsar.client.api.PulsarClientException;
 /**
  * Acknowledge the receipt of a message using the Pulsar consumer.
  * <p>
- * Available on the {@link Exchange} if {@link PulsarConfiguration#isAllowManualAcknowledgement()} is true.
- * An alternative to the default may be provided by implementing {@link PulsarMessageReceiptFactory}.
+ * Available on the {@link Exchange} if
+ * {@link PulsarConfiguration#isAllowManualAcknowledgement()} is true. An
+ * alternative to the default may be provided by implementing
+ * {@link PulsarMessageReceiptFactory}.
  */
 public interface PulsarMessageReceipt {
 
@@ -38,7 +40,8 @@ public interface PulsarMessageReceipt {
     void acknowledge() throws PulsarClientException;
 
     /**
-     * Acknowledge receipt of all of the messages in the stream up to and including this message synchronously.
+     * Acknowledge receipt of all of the messages in the stream up to and
+     * including this message synchronously.
      *
      * @see org.apache.pulsar.client.api.Consumer#acknowledgeCumulative(MessageId)
      */
@@ -52,7 +55,8 @@ public interface PulsarMessageReceipt {
     CompletableFuture<Void> acknowledgeAsync();
 
     /**
-     * Acknowledge receipt of all of the messages in the stream up to and including this message asynchronously.
+     * Acknowledge receipt of all of the messages in the stream up to and
+     * including this message asynchronously.
      *
      * @see org.apache.pulsar.client.api.Consumer#acknowledgeCumulativeAsync(MessageId)
      */
@@ -62,9 +66,10 @@ public interface PulsarMessageReceipt {
      * Acknowledge the failure to process this message.
      *
      * @see org.apache.pulsar.client.api.Consumer#negativeAcknowledge(MessageId)
-     * Note: Available in Puslar 2.4.0. Implementations with earlier versions should return an {@link java.lang.UnsupportedOperationException}.
+     *      Note: Available in Puslar 2.4.0. Implementations with earlier
+     *      versions should return an
+     *      {@link java.lang.UnsupportedOperationException}.
      */
     void negativeAcknowledge();
 
 }
-
diff --git a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarMessageReceiptFactory.java b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarMessageReceiptFactory.java
index 314ae95..0a09723 100644
--- a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarMessageReceiptFactory.java
+++ b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarMessageReceiptFactory.java
@@ -21,10 +21,13 @@ import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
 
 /**
- * Factory to create a new {@link PulsarMessageReceipt} to store on the {@link Exchange}.
+ * Factory to create a new {@link PulsarMessageReceipt} to store on the
+ * {@link Exchange}.
  * <p>
- * Implement this interface if an alternate implementation of {@link PulsarMessageReceipt} is required
- * as newer Pulsar clients may have acknowledgement functionality not yet supported by {@link DefaultPulsarMessageReceipt}.
+ * Implement this interface if an alternate implementation of
+ * {@link PulsarMessageReceipt} is required as newer Pulsar clients may have
+ * acknowledgement functionality not yet supported by
+ * {@link DefaultPulsarMessageReceipt}.
  */
 public interface PulsarMessageReceiptFactory {
 
diff --git a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarProducer.java b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarProducer.java
index d7c8b0a..5503949 100644
--- a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarProducer.java
+++ b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarProducer.java
@@ -44,8 +44,7 @@ public class PulsarProducer extends DefaultProducer {
         final Message message = exchange.getIn();
         byte[] body;
         try {
-            body = exchange.getContext().getTypeConverter()
-                    .mandatoryConvertTo(byte[].class, exchange, message.getBody());
+            body = exchange.getContext().getTypeConverter().mandatoryConvertTo(byte[].class, exchange, message.getBody());
         } catch (NoTypeConversionAvailableException | TypeConversionException exception) {
             // fallback to try serialize the data
             body = PulsarMessageUtils.serialize(message.getBody());
@@ -61,24 +60,15 @@ public class PulsarProducer extends DefaultProducer {
             if (producerName == null) {
                 producerName = topicUri + "-" + Thread.currentThread().getId();
             }
-            final ProducerBuilder<byte[]> producerBuilder = pulsarEndpoint
-                    .getPulsarClient()
-                    .newProducer()
-                    .producerName(producerName)
-                    .topic(topicUri)
-                    .sendTimeout(configuration.getSendTimeoutMs(), TimeUnit.MILLISECONDS)
-                    .blockIfQueueFull(configuration.isBlockIfQueueFull())
-                    .maxPendingMessages(configuration.getMaxPendingMessages())
-                    .maxPendingMessagesAcrossPartitions(configuration.getMaxPendingMessagesAcrossPartitions())
-                    .batchingMaxPublishDelay(configuration.getBatchingMaxPublishDelayMicros(), TimeUnit.MICROSECONDS)
-                    .batchingMaxMessages(configuration.getMaxPendingMessages())
-                    .enableBatching(configuration.isBatchingEnabled())
-                    .initialSequenceId(configuration.getInitialSequenceId())
-                    .compressionType(configuration.getCompressionType());
+            final ProducerBuilder<byte[]> producerBuilder = pulsarEndpoint.getPulsarClient().newProducer().producerName(producerName).topic(topicUri)
+                .sendTimeout(configuration.getSendTimeoutMs(), TimeUnit.MILLISECONDS).blockIfQueueFull(configuration.isBlockIfQueueFull())
+                .maxPendingMessages(configuration.getMaxPendingMessages()).maxPendingMessagesAcrossPartitions(configuration.getMaxPendingMessagesAcrossPartitions())
+                .batchingMaxPublishDelay(configuration.getBatchingMaxPublishDelayMicros(), TimeUnit.MICROSECONDS).batchingMaxMessages(configuration.getMaxPendingMessages())
+                .enableBatching(configuration.isBatchingEnabled()).initialSequenceId(configuration.getInitialSequenceId()).compressionType(configuration.getCompressionType());
             if (ObjectHelper.isNotEmpty(configuration.getMessageRouter())) {
                 producerBuilder.messageRouter(configuration.getMessageRouter());
             } else {
-            	producerBuilder.messageRoutingMode(configuration.getMessageRoutingMode());
+                producerBuilder.messageRoutingMode(configuration.getMessageRoutingMode());
             }
             producer = producerBuilder.create();
         }
diff --git a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/configuration/PulsarConfiguration.java b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/configuration/PulsarConfiguration.java
index ea34312..b60bfed 100644
--- a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/configuration/PulsarConfiguration.java
+++ b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/configuration/PulsarConfiguration.java
@@ -55,11 +55,10 @@ public class PulsarConfiguration {
     private int sendTimeoutMs = 30000;
     @UriParam(label = "producer", description = "Whether to block the producing thread if pending messages queue is full or to throw a ProducerQueueIsFullError", defaultValue = "false")
     private boolean blockIfQueueFull;
-    @UriParam(label = "producer", description = "Size of the pending massages queue. When the queue is full, by default, any further sends will fail unless blockIfQueueFull=true",
-            defaultValue = "1000")
+    @UriParam(label = "producer", description = "Size of the pending massages queue. When the queue is full, by default, any further sends will fail unless blockIfQueueFull=true", defaultValue = "1000")
     private int maxPendingMessages = 1000;
     @UriParam(label = "producer", description = "The maximum number of pending messages for partitioned topics. The maxPendingMessages value will be reduced if "
-            + "(number of partitions * maxPendingMessages) exceeds this value. Partitioned topics have a pending message queue for each partition.", defaultValue = "50000")
+                                                + "(number of partitions * maxPendingMessages) exceeds this value. Partitioned topics have a pending message queue for each partition.", defaultValue = "50000")
     private int maxPendingMessagesAcrossPartitions = 50000;
     @UriParam(label = "producer", description = "The maximum time period within which the messages sent will be batched if batchingEnabled is true.", defaultValue = "1000")
     private long batchingMaxPublishDelayMicros = TimeUnit.MILLISECONDS.toMicros(1);
@@ -92,7 +91,8 @@ public class PulsarConfiguration {
     }
 
     /**
-     * Type of the subscription [EXCLUSIVE|SHARED|FAILOVER], defaults to EXCLUSIVE
+     * Type of the subscription [EXCLUSIVE|SHARED|FAILOVER], defaults to
+     * EXCLUSIVE
      */
     public void setSubscriptionType(SubscriptionType subscriptionType) {
         this.subscriptionType = subscriptionType;
@@ -147,7 +147,8 @@ public class PulsarConfiguration {
     }
 
     /**
-     * Prefix to add to consumer names when a SHARED or FAILOVER subscription is used
+     * Prefix to add to consumer names when a SHARED or FAILOVER subscription is
+     * used
      */
     public void setConsumerNamePrefix(String consumerNamePrefix) {
         this.consumerNamePrefix = consumerNamePrefix;
@@ -160,9 +161,12 @@ public class PulsarConfiguration {
     /**
      * Whether to allow manual message acknowledgements.
      * <p/>
-     * If this option is enabled, then messages are not immediately acknowledged after being consumed.
-     * Instead, an instance of {@link PulsarMessageReceipt} is stored as a header on the {@link org.apache.camel.Exchange}.
-     * Messages can then be acknowledged using {@link PulsarMessageReceipt} at any time before the ackTimeout occurs.
+     * If this option is enabled, then messages are not immediately acknowledged
+     * after being consumed. Instead, an instance of
+     * {@link PulsarMessageReceipt} is stored as a header on the
+     * {@link org.apache.camel.Exchange}. Messages can then be acknowledged
+     * using {@link PulsarMessageReceipt} at any time before the ackTimeout
+     * occurs.
      */
     public void setAllowManualAcknowledgement(boolean allowManualAcknowledgement) {
         this.allowManualAcknowledgement = allowManualAcknowledgement;
@@ -184,15 +188,15 @@ public class PulsarConfiguration {
     }
 
     /**
-     * Group the consumer acknowledgments for the specified time in milliseconds - defaults to 100
+     * Group the consumer acknowledgments for the specified time in milliseconds
+     * - defaults to 100
      */
     public void setAckGroupTimeMillis(long ackGroupTimeMillis) {
         this.ackGroupTimeMillis = ackGroupTimeMillis;
     }
 
     /**
-      * Send timeout in milliseconds.
-      * Defaults to 30,000ms (30 seconds)
+     * Send timeout in milliseconds. Defaults to 30,000ms (30 seconds)
      */
     public void setSendTimeoutMs(int sendTimeoutMs) {
         this.sendTimeoutMs = sendTimeoutMs;
@@ -203,10 +207,10 @@ public class PulsarConfiguration {
     }
 
     /**
-     * Set whether the send and asyncSend operations should block when the outgoing message queue is full.
-     * If set to false, send operations will immediately fail with ProducerQueueIsFullError when there is no space left
-     * in the pending queue.
-     * Default is false.
+     * Set whether the send and asyncSend operations should block when the
+     * outgoing message queue is full. If set to false, send operations will
+     * immediately fail with ProducerQueueIsFullError when there is no space
+     * left in the pending queue. Default is false.
      */
     public void setBlockIfQueueFull(boolean blockIfQueueFull) {
         this.blockIfQueueFull = blockIfQueueFull;
@@ -217,8 +221,8 @@ public class PulsarConfiguration {
     }
 
     /**
-     * Set the max size of the queue holding the messages pending to receive an acknowledgment from the broker.
-     * Default is 1000.
+     * Set the max size of the queue holding the messages pending to receive an
+     * acknowledgment from the broker. Default is 1000.
      */
     public void setMaxPendingMessages(int maxPendingMessages) {
         this.maxPendingMessages = maxPendingMessages;
@@ -229,8 +233,8 @@ public class PulsarConfiguration {
     }
 
     /**
-     * Set the number of max pending messages across all the partitions.
-     * Default is 50000.
+     * Set the number of max pending messages across all the partitions. Default
+     * is 50000.
      */
     public void setMaxPendingMessagesAcrossPartitions(int maxPendingMessagesAcrossPartitions) {
         this.maxPendingMessagesAcrossPartitions = maxPendingMessagesAcrossPartitions;
@@ -241,11 +245,12 @@ public class PulsarConfiguration {
     }
 
     /**
-     * Set the time period within which the messages sent will be batched if batch messages are
-     * enabled. If set to a non zero value, messages will be queued until either:
+     * Set the time period within which the messages sent will be batched if
+     * batch messages are enabled. If set to a non zero value, messages will be
+     * queued until either:
      * <ul>
-     *  <li>this time interval expires</li>
-     *  <li>the max number of messages in a batch is reached
+     * <li>this time interval expires</li>
+     * <li>the max number of messages in a batch is reached
      * </ul>
      * Default is 1ms.
      */
@@ -258,8 +263,7 @@ public class PulsarConfiguration {
     }
 
     /**
-     * Set the maximum number of messages permitted in a batch.
-     * Default 1,000.
+     * Set the maximum number of messages permitted in a batch. Default 1,000.
      */
     public void setBatchingMaxMessages(int batchingMaxMessages) {
         this.batchingMaxMessages = batchingMaxMessages;
@@ -270,8 +274,8 @@ public class PulsarConfiguration {
     }
 
     /**
-     * Control whether automatic batching of messages is enabled for the producer.
-     * Default is true.
+     * Control whether automatic batching of messages is enabled for the
+     * producer. Default is true.
      */
     public void setBatchingEnabled(boolean batchingEnabled) {
         this.batchingEnabled = batchingEnabled;
@@ -282,9 +286,10 @@ public class PulsarConfiguration {
     }
 
     /**
-     * Set the baseline for the sequence ids for messages published by the producer.
-     * First message will be using (initialSequenceId  1) as its sequence id and subsequent messages will be assigned
-     * incremental sequence ids, if not otherwise specified.
+     * Set the baseline for the sequence ids for messages published by the
+     * producer. First message will be using (initialSequenceId 1) as its
+     * sequence id and subsequent messages will be assigned incremental sequence
+     * ids, if not otherwise specified.
      */
     public void setInitialSequenceId(long initialSequenceId) {
         this.initialSequenceId = initialSequenceId;
@@ -315,24 +320,23 @@ public class PulsarConfiguration {
     /**
      * Set the message routing mode for the producer.
      */
-	public MessageRoutingMode getMessageRoutingMode() {
-		return messageRoutingMode;
-	}
+    public MessageRoutingMode getMessageRoutingMode() {
+        return messageRoutingMode;
+    }
 
-	public void setMessageRoutingMode(MessageRoutingMode messageRoutingMode) {
-		this.messageRoutingMode = messageRoutingMode;
-	}
+    public void setMessageRoutingMode(MessageRoutingMode messageRoutingMode) {
+        this.messageRoutingMode = messageRoutingMode;
+    }
 
     /**
      * Set a custom Message Router.
      */
-	public MessageRouter getMessageRouter() {
-		return messageRouter;
-	}
-
-	public void setMessageRouter(MessageRouter messageRouter) {
-		this.messageRouter = messageRouter;
-	}
-    
+    public MessageRouter getMessageRouter() {
+        return messageRouter;
+    }
+
+    public void setMessageRouter(MessageRouter messageRouter) {
+        this.messageRouter = messageRouter;
+    }
 
 }
diff --git a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/PulsarPath.java b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/PulsarPath.java
index 1af739f..eb3b63e 100644
--- a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/PulsarPath.java
+++ b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/PulsarPath.java
@@ -58,4 +58,4 @@ public class PulsarPath {
     public boolean isAutoConfigurable() {
         return autoConfigurable;
     }
-}
\ No newline at end of file
+}
diff --git a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/consumers/CommonCreationStrategyImpl.java b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/consumers/CommonCreationStrategyImpl.java
index a96e369..de2ff9c 100644
--- a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/consumers/CommonCreationStrategyImpl.java
+++ b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/consumers/CommonCreationStrategyImpl.java
@@ -24,7 +24,7 @@ import org.apache.camel.component.pulsar.configuration.PulsarConfiguration;
 import org.apache.pulsar.client.api.ConsumerBuilder;
 
 public final class CommonCreationStrategyImpl {
-    
+
     private CommonCreationStrategyImpl() {
     }
 
@@ -32,8 +32,7 @@ public final class CommonCreationStrategyImpl {
         final PulsarConfiguration endpointConfiguration = pulsarEndpoint.getPulsarConfiguration();
 
         return pulsarEndpoint.getPulsarClient().newConsumer().topic(pulsarEndpoint.getUri()).subscriptionName(endpointConfiguration.getSubscriptionName())
-            .receiverQueueSize(endpointConfiguration.getConsumerQueueSize()).consumerName(name)
-            .ackTimeout(endpointConfiguration.getAckTimeoutMillis(), TimeUnit.MILLISECONDS)
+            .receiverQueueSize(endpointConfiguration.getConsumerQueueSize()).consumerName(name).ackTimeout(endpointConfiguration.getAckTimeoutMillis(), TimeUnit.MILLISECONDS)
             .acknowledgmentGroupTime(endpointConfiguration.getAckGroupTimeMillis(), TimeUnit.MILLISECONDS)
             .messageListener(new PulsarMessageListener(pulsarEndpoint, pulsarConsumer.getExceptionHandler(), pulsarConsumer.getProcessor()));
     }
diff --git a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/message/PulsarMessageUtils.java b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/message/PulsarMessageUtils.java
index 8332c2a..860337f 100644
--- a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/message/PulsarMessageUtils.java
+++ b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/message/PulsarMessageUtils.java
@@ -35,7 +35,7 @@ import static org.apache.camel.component.pulsar.utils.message.PulsarMessageHeade
 import static org.apache.camel.component.pulsar.utils.message.PulsarMessageHeaders.TOPIC_NAME;
 
 public final class PulsarMessageUtils {
-    
+
     private PulsarMessageUtils() {
     }
 
diff --git a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarConcurrentConsumerInTest.java b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarConcurrentConsumerInTest.java
index a4bb747..c859d30 100644
--- a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarConcurrentConsumerInTest.java
+++ b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarConcurrentConsumerInTest.java
@@ -45,7 +45,7 @@ public class PulsarConcurrentConsumerInTest extends PulsarTestSupport {
     private static final int NUMBER_OF_CONSUMERS = 5;
 
     @EndpointInject("pulsar:" + TOPIC_URI + "?numberOfConsumers=5&subscriptionType=Shared"
-                          + "&subscriptionName=camel-subscription&consumerQueueSize=1&consumerNamePrefix=camel-consumer-")
+                    + "&subscriptionName=camel-subscription&consumerQueueSize=1&consumerNamePrefix=camel-consumer-")
     private Endpoint from;
 
     @EndpointInject("mock:result")
@@ -72,7 +72,7 @@ public class PulsarConcurrentConsumerInTest extends PulsarTestSupport {
     @Override
     protected Registry createCamelRegistry() throws Exception {
         SimpleRegistry registry = new SimpleRegistry();
-        
+
         registerPulsarBeans(registry);
 
         return registry;
diff --git a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarConcurrentProducerInTest.java b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarConcurrentProducerInTest.java
index d6aac83..d3b4622 100644
--- a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarConcurrentProducerInTest.java
+++ b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarConcurrentProducerInTest.java
@@ -43,7 +43,7 @@ public class PulsarConcurrentProducerInTest extends PulsarTestSupport {
     private ProducerTemplate producerTemplate;
 
     @EndpointInject("pulsar:" + TOPIC_URI + "?numberOfConsumers=3&subscriptionType=Shared" + "&subscriptionName=camel-subscription&consumerQueueSize=5"
-                          + "&consumerNamePrefix=camel-consumer" + "&producerName=" + PRODUCER)
+                    + "&consumerNamePrefix=camel-consumer" + "&producerName=" + PRODUCER)
     private Endpoint from;
 
     @EndpointInject("mock:result")
@@ -64,7 +64,7 @@ public class PulsarConcurrentProducerInTest extends PulsarTestSupport {
     @Override
     protected Registry createCamelRegistry() throws Exception {
         SimpleRegistry registry = new SimpleRegistry();
-        
+
         registerPulsarBeans(registry);
 
         return registry;
diff --git a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarConsumerAcknowledgementTest.java b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarConsumerAcknowledgementTest.java
index 0ce2dcc..e3195b3 100644
--- a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarConsumerAcknowledgementTest.java
+++ b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarConsumerAcknowledgementTest.java
@@ -42,12 +42,8 @@ public class PulsarConsumerAcknowledgementTest extends PulsarTestSupport {
     private static final String TOPIC_URI = "persistent://public/default/camel-topic";
     private static final String PRODUCER = "camel-producer-1";
 
-    @EndpointInject(uri = "pulsar:" + TOPIC_URI
-            + "?numberOfConsumers=1&subscriptionType=Exclusive"
-            + "&subscriptionName=camel-subscription&consumerQueueSize=1&consumerName=camel-consumer"
-            + "&allowManualAcknowledgement=true"
-            + "&ackTimeoutMillis=1000"
-    )
+    @EndpointInject(uri = "pulsar:" + TOPIC_URI + "?numberOfConsumers=1&subscriptionType=Exclusive"
+                          + "&subscriptionName=camel-subscription&consumerQueueSize=1&consumerName=camel-consumer" + "&allowManualAcknowledgement=true" + "&ackTimeoutMillis=1000")
     private Endpoint from;
 
     @EndpointInject(uri = "mock:result")
@@ -58,10 +54,7 @@ public class PulsarConsumerAcknowledgementTest extends PulsarTestSupport {
     @Before
     public void setup() throws Exception {
         context.removeRoute("myRoute");
-        producer = givenPulsarClient().newProducer(Schema.STRING)
-                .producerName(PRODUCER)
-                .topic(TOPIC_URI)
-                .create();
+        producer = givenPulsarClient().newProducer(Schema.STRING).producerName(PRODUCER).topic(TOPIC_URI).create();
     }
 
     @Override
@@ -85,11 +78,7 @@ public class PulsarConsumerAcknowledgementTest extends PulsarTestSupport {
     }
 
     private PulsarClient givenPulsarClient() throws PulsarClientException {
-        return new ClientBuilderImpl()
-                .serviceUrl(getPulsarBrokerUrl())
-                .ioThreads(1)
-                .listenerThreads(1)
-                .build();
+        return new ClientBuilderImpl().serviceUrl(getPulsarBrokerUrl()).ioThreads(1).listenerThreads(1).build();
     }
 
     @Test
@@ -102,8 +91,7 @@ public class PulsarConsumerAcknowledgementTest extends PulsarTestSupport {
                 from(from).routeId("myRoute").to(to).process(exchange -> {
                     LOGGER.info("Processing message {}", exchange.getIn().getBody());
 
-                    PulsarMessageReceipt receipt = (PulsarMessageReceipt) exchange.getIn()
-                            .getHeader(PulsarMessageHeaders.MESSAGE_RECEIPT);
+                    PulsarMessageReceipt receipt = (PulsarMessageReceipt)exchange.getIn().getHeader(PulsarMessageHeaders.MESSAGE_RECEIPT);
                     receipt.acknowledge();
                 });
             }
@@ -124,8 +112,7 @@ public class PulsarConsumerAcknowledgementTest extends PulsarTestSupport {
                 from(from).routeId("myRoute").to(to).process(exchange -> {
                     LOGGER.info("Processing message {}", exchange.getIn().getBody());
 
-                    PulsarMessageReceipt receipt = (PulsarMessageReceipt) exchange.getIn()
-                            .getHeader(PulsarMessageHeaders.MESSAGE_RECEIPT);
+                    PulsarMessageReceipt receipt = (PulsarMessageReceipt)exchange.getIn().getHeader(PulsarMessageHeaders.MESSAGE_RECEIPT);
                     try {
                         CompletableFuture<Void> f = receipt.acknowledgeAsync();
                         f.get();
@@ -151,8 +138,7 @@ public class PulsarConsumerAcknowledgementTest extends PulsarTestSupport {
                 from(from).routeId("myRoute").to(to).process(exchange -> {
                     LOGGER.info("Processing message {}", exchange.getIn().getBody());
 
-                    PulsarMessageReceipt receipt = (PulsarMessageReceipt) exchange.getIn()
-                            .getHeader(PulsarMessageHeaders.MESSAGE_RECEIPT);
+                    PulsarMessageReceipt receipt = (PulsarMessageReceipt)exchange.getIn().getHeader(PulsarMessageHeaders.MESSAGE_RECEIPT);
                     // Ack the second message. The first will also be acked.
                     if (exchange.getIn().getBody().equals("Hello World Again!")) {
                         receipt.acknowledgeCumulative();
@@ -177,8 +163,7 @@ public class PulsarConsumerAcknowledgementTest extends PulsarTestSupport {
                 from(from).routeId("myRoute").to(to).process(exchange -> {
                     LOGGER.info("Processing message {}", exchange.getIn().getBody());
 
-                    PulsarMessageReceipt receipt = (PulsarMessageReceipt) exchange.getIn()
-                            .getHeader(PulsarMessageHeaders.MESSAGE_RECEIPT);
+                    PulsarMessageReceipt receipt = (PulsarMessageReceipt)exchange.getIn().getHeader(PulsarMessageHeaders.MESSAGE_RECEIPT);
                     // Ack the second message. The first will also be acked.
                     if (exchange.getIn().getBody().equals("Hello World Again!")) {
                         try {
diff --git a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarConsumerInTest.java b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarConsumerInTest.java
index 2590290..8ff9e91 100644
--- a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarConsumerInTest.java
+++ b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarConsumerInTest.java
@@ -43,10 +43,8 @@ public class PulsarConsumerInTest extends PulsarTestSupport {
     private static final String TOPIC_URI = "persistent://public/default/camel-topic";
     private static final String PRODUCER = "camel-producer-1";
 
-    @EndpointInject("pulsar:" + TOPIC_URI
-        + "?numberOfConsumers=1&subscriptionType=Exclusive"
-        + "&subscriptionName=camel-subscription&consumerQueueSize=1&consumerName=camel-consumer"
-    )
+    @EndpointInject("pulsar:" + TOPIC_URI + "?numberOfConsumers=1&subscriptionType=Exclusive"
+                    + "&subscriptionName=camel-subscription&consumerQueueSize=1&consumerName=camel-consumer")
     private Endpoint from;
 
     @EndpointInject("mock:result")
@@ -73,7 +71,7 @@ public class PulsarConsumerInTest extends PulsarTestSupport {
     @Override
     protected Registry createCamelRegistry() throws Exception {
         SimpleRegistry registry = new SimpleRegistry();
-        
+
         registerPulsarBeans(registry);
 
         return registry;
@@ -92,21 +90,14 @@ public class PulsarConsumerInTest extends PulsarTestSupport {
     }
 
     private PulsarClient givenPulsarClient() throws PulsarClientException {
-        return new ClientBuilderImpl()
-            .serviceUrl(getPulsarBrokerUrl())
-            .ioThreads(1)
-            .listenerThreads(1)
-            .build();
+        return new ClientBuilderImpl().serviceUrl(getPulsarBrokerUrl()).ioThreads(1).listenerThreads(1).build();
     }
 
     @Test
     public void testAMessageToClusterIsConsumed() throws Exception {
         to.expectedMessageCount(1);
 
-        Producer<String> producer = givenPulsarClient().newProducer(Schema.STRING)
-            .producerName(PRODUCER)
-            .topic(TOPIC_URI)
-            .create();
+        Producer<String> producer = givenPulsarClient().newProducer(Schema.STRING).producerName(PRODUCER).topic(TOPIC_URI).create();
 
         producer.send("Hello World!");
 
diff --git a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarConsumerNoAcknowledgementTest.java b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarConsumerNoAcknowledgementTest.java
index 1cb5527..f57b4a5 100644
--- a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarConsumerNoAcknowledgementTest.java
+++ b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarConsumerNoAcknowledgementTest.java
@@ -36,11 +36,8 @@ public class PulsarConsumerNoAcknowledgementTest extends PulsarTestSupport {
     private static final String TOPIC_URI = "persistent://public/default/camel-topic";
     private static final String PRODUCER = "camel-producer-1";
 
-    @EndpointInject(uri = "pulsar:" + TOPIC_URI
-            + "?numberOfConsumers=1&subscriptionType=Exclusive"
-            + "&subscriptionName=camel-subscription&consumerQueueSize=1&consumerName=camel-consumer"
-            + "&ackTimeoutMillis=1000"
-    )
+    @EndpointInject(uri = "pulsar:" + TOPIC_URI + "?numberOfConsumers=1&subscriptionType=Exclusive"
+                          + "&subscriptionName=camel-subscription&consumerQueueSize=1&consumerName=camel-consumer" + "&ackTimeoutMillis=1000")
     private Endpoint from;
 
     @EndpointInject(uri = "mock:result")
@@ -74,26 +71,21 @@ public class PulsarConsumerNoAcknowledgementTest extends PulsarTestSupport {
         PulsarComponent comp = new PulsarComponent(context);
         comp.setAutoConfiguration(autoConfiguration);
         comp.setPulsarClient(pulsarClient);
-        comp.setAllowManualAcknowledgement(true); // Set to true here instead of the endpoint query parameter.
+        comp.setAllowManualAcknowledgement(true); // Set to true here instead of
+                                                  // the endpoint query
+                                                  // parameter.
         jndi.bind("pulsar", comp);
     }
 
     private PulsarClient givenPulsarClient() throws PulsarClientException {
-        return new ClientBuilderImpl()
-                .serviceUrl(getPulsarBrokerUrl())
-                .ioThreads(1)
-                .listenerThreads(1)
-                .build();
+        return new ClientBuilderImpl().serviceUrl(getPulsarBrokerUrl()).ioThreads(1).listenerThreads(1).build();
     }
 
     @Test
     public void testAMessageIsConsumedMultipleTimes() throws Exception {
         to.expectedMinimumMessageCount(2);
 
-        Producer<String> producer = givenPulsarClient().newProducer(Schema.STRING)
-                .producerName(PRODUCER)
-                .topic(TOPIC_URI)
-                .create();
+        Producer<String> producer = givenPulsarClient().newProducer(Schema.STRING).producerName(PRODUCER).topic(TOPIC_URI).create();
 
         producer.send("Hello World!");
 
diff --git a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarCustomMessageReceiptTest.java b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarCustomMessageReceiptTest.java
index 24a9132..eea202e 100644
--- a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarCustomMessageReceiptTest.java
+++ b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarCustomMessageReceiptTest.java
@@ -53,12 +53,8 @@ public class PulsarCustomMessageReceiptTest extends PulsarTestSupport {
 
     public PulsarMessageReceipt mockPulsarMessageReceipt = mock(PulsarMessageReceipt.class);
 
-    @EndpointInject(uri = "pulsar:" + TOPIC_URI
-            + "?numberOfConsumers=1&subscriptionType=Exclusive"
-            + "&subscriptionName=camel-subscription&consumerQueueSize=1&consumerName=camel-consumer"
-            + "&allowManualAcknowledgement=true"
-            + "&ackTimeoutMillis=1000"
-    )
+    @EndpointInject(uri = "pulsar:" + TOPIC_URI + "?numberOfConsumers=1&subscriptionType=Exclusive"
+                          + "&subscriptionName=camel-subscription&consumerQueueSize=1&consumerName=camel-consumer" + "&allowManualAcknowledgement=true" + "&ackTimeoutMillis=1000")
     private Endpoint from;
 
     @EndpointInject(uri = "mock:result")
@@ -68,10 +64,7 @@ public class PulsarCustomMessageReceiptTest extends PulsarTestSupport {
 
     @Before
     public void setup() throws Exception {
-        producer = givenPulsarClient().newProducer(Schema.STRING)
-                .producerName(PRODUCER)
-                .topic(TOPIC_URI)
-                .create();
+        producer = givenPulsarClient().newProducer(Schema.STRING).producerName(PRODUCER).topic(TOPIC_URI).create();
     }
 
     @Override
@@ -97,11 +90,7 @@ public class PulsarCustomMessageReceiptTest extends PulsarTestSupport {
     }
 
     private PulsarClient givenPulsarClient() throws PulsarClientException {
-        return new ClientBuilderImpl()
-                .serviceUrl(getPulsarBrokerUrl())
-                .ioThreads(1)
-                .listenerThreads(1)
-                .build();
+        return new ClientBuilderImpl().serviceUrl(getPulsarBrokerUrl()).ioThreads(1).listenerThreads(1).build();
     }
 
     @Test
@@ -116,8 +105,7 @@ public class PulsarCustomMessageReceiptTest extends PulsarTestSupport {
                 from(from).routeId("myRoute").to(to).process(exchange -> {
                     LOGGER.info("Processing message {}", exchange.getIn().getBody());
 
-                    PulsarMessageReceipt receipt = (PulsarMessageReceipt) exchange.getIn()
-                            .getHeader(PulsarMessageHeaders.MESSAGE_RECEIPT);
+                    PulsarMessageReceipt receipt = (PulsarMessageReceipt)exchange.getIn().getHeader(PulsarMessageHeaders.MESSAGE_RECEIPT);
                     receipt.acknowledge();
                 });
             }
@@ -127,10 +115,11 @@ public class PulsarCustomMessageReceiptTest extends PulsarTestSupport {
 
         MockEndpoint.assertIsSatisfied(10, TimeUnit.SECONDS, to);
 
-        // The mock does not actually acknowledge using the pulsar consumer, so the message will be re-consumed
+        // The mock does not actually acknowledge using the pulsar consumer, so
+        // the message will be re-consumed
         // after the ackTimeout.
         verify(mockPulsarMessageReceipt, atLeast(2)).acknowledge();
         verifyNoMoreInteractions(mockPulsarMessageReceipt);
     }
-    
+
 }
diff --git a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarProducerInTest.java b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarProducerInTest.java
index c66ca1c..52b274e 100644
--- a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarProducerInTest.java
+++ b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarProducerInTest.java
@@ -40,12 +40,8 @@ public class PulsarProducerInTest extends PulsarTestSupport {
     @Produce("direct:start")
     private ProducerTemplate producerTemplate;
 
-    @EndpointInject("pulsar:" + TOPIC_URI
-        + "?numberOfConsumers=1&subscriptionType=Exclusive"
-        + "&subscriptionName=camel-subscription&consumerQueueSize=1"
-        + "&consumerName=camel-consumer"
-        + "&producerName=" + PRODUCER
-    )
+    @EndpointInject("pulsar:" + TOPIC_URI + "?numberOfConsumers=1&subscriptionType=Exclusive" + "&subscriptionName=camel-subscription&consumerQueueSize=1"
+                    + "&consumerName=camel-consumer" + "&producerName=" + PRODUCER)
     private Endpoint from;
 
     @EndpointInject("mock:result")
@@ -66,7 +62,7 @@ public class PulsarProducerInTest extends PulsarTestSupport {
     @Override
     protected Registry createCamelRegistry() throws Exception {
         SimpleRegistry registry = new SimpleRegistry();
-        
+
         registerPulsarBeans(registry);
 
         return registry;
@@ -85,11 +81,7 @@ public class PulsarProducerInTest extends PulsarTestSupport {
     }
 
     private PulsarClient givenPulsarClient() throws PulsarClientException {
-        return new ClientBuilderImpl()
-            .serviceUrl(getPulsarBrokerUrl())
-            .ioThreads(1)
-            .listenerThreads(1)
-            .build();
+        return new ClientBuilderImpl().serviceUrl(getPulsarBrokerUrl()).ioThreads(1).listenerThreads(1).build();
     }
 
     @Test
diff --git a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarTestSupport.java b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarTestSupport.java
index c49f46a..03124db 100644
--- a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarTestSupport.java
+++ b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarTestSupport.java
@@ -34,9 +34,7 @@ public class PulsarTestSupport extends ContainerAwareTestSupport {
     }
 
     public static GenericContainer pulsarContainer() {
-        return new GenericContainer(CONTAINER_IMAGE)
-            .withNetworkAliases(CONTAINER_NAME)
-            .withExposedPorts(BROKER_PORT, BROKER_HTTP_PORT)
+        return new GenericContainer(CONTAINER_IMAGE).withNetworkAliases(CONTAINER_NAME).withExposedPorts(BROKER_PORT, BROKER_HTTP_PORT)
             .withCommand("/pulsar/bin/pulsar", "standalone", "--no-functions-worker", "-nss")
             .waitingFor(Wait.forHttp(WAIT_FOR_ENDPOINT).forStatusCode(200).forPort(BROKER_HTTP_PORT));
     }
diff --git a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/utils/PulsarUtilsTest.java b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/utils/PulsarUtilsTest.java
index b93ba10..b4c9dad 100644
--- a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/utils/PulsarUtilsTest.java
+++ b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/utils/PulsarUtilsTest.java
@@ -66,8 +66,8 @@ public class PulsarUtilsTest {
         doThrow(new PulsarClientException("A Pulsar Client exception occurred")).when(consumer).close();
 
         consumer.close();
-        
+
         verify(consumer).unsubscribe();
         verify(consumer).close();
     }
-}
\ No newline at end of file
+}