You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2019/09/20 10:44:41 UTC
[camel] 03/05: Camel-Pulsar: Fixed CS
This is an automated email from the ASF dual-hosted git repository.
acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git
commit 10de4bab2e76046b9535128bf38bf67b80c3f3f4
Author: Andrea Cosentino <an...@gmail.com>
AuthorDate: Fri Sep 20 12:38:40 2019 +0200
Camel-Pulsar: Fixed CS
---
.../camel/component/pulsar/PulsarComponent.java | 12 ++-
.../camel/component/pulsar/PulsarEndpoint.java | 14 ++--
.../component/pulsar/PulsarMessageListener.java | 6 +-
.../component/pulsar/PulsarMessageReceipt.java | 17 ++--
.../pulsar/PulsarMessageReceiptFactory.java | 9 ++-
.../camel/component/pulsar/PulsarProducer.java | 24 ++----
.../pulsar/configuration/PulsarConfiguration.java | 92 +++++++++++-----------
.../camel/component/pulsar/utils/PulsarPath.java | 2 +-
.../consumers/CommonCreationStrategyImpl.java | 5 +-
.../pulsar/utils/message/PulsarMessageUtils.java | 2 +-
.../pulsar/PulsarConcurrentConsumerInTest.java | 4 +-
.../pulsar/PulsarConcurrentProducerInTest.java | 4 +-
.../pulsar/PulsarConsumerAcknowledgementTest.java | 31 ++------
.../component/pulsar/PulsarConsumerInTest.java | 19 ++---
.../PulsarConsumerNoAcknowledgementTest.java | 22 ++----
.../pulsar/PulsarCustomMessageReceiptTest.java | 27 ++-----
.../component/pulsar/PulsarProducerInTest.java | 16 +---
.../camel/component/pulsar/PulsarTestSupport.java | 4 +-
.../component/pulsar/utils/PulsarUtilsTest.java | 4 +-
19 files changed, 134 insertions(+), 180 deletions(-)
diff --git a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarComponent.java b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarComponent.java
index b43ab1d..08e6f65 100644
--- a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarComponent.java
+++ b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarComponent.java
@@ -108,9 +108,12 @@ public class PulsarComponent extends DefaultComponent {
/**
* Whether to allow manual message acknowledgements.
* <p/>
- * If this option is enabled, then messages are not immediately acknowledged after being consumed.
- * Instead, an instance of {@link PulsarMessageReceipt} is stored as a header on the {@link org.apache.camel.Exchange}.
- * Messages can then be acknowledged using {@link PulsarMessageReceipt} at any time before the ackTimeout occurs.
+ * If this option is enabled, then messages are not immediately acknowledged
+ * after being consumed. Instead, an instance of
+ * {@link PulsarMessageReceipt} is stored as a header on the
+ * {@link org.apache.camel.Exchange}. Messages can then be acknowledged
+ * using {@link PulsarMessageReceipt} at any time before the ackTimeout
+ * occurs.
*/
public void setAllowManualAcknowledgement(boolean allowManualAcknowledgement) {
this.allowManualAcknowledgement = allowManualAcknowledgement;
@@ -121,7 +124,8 @@ public class PulsarComponent extends DefaultComponent {
}
/**
- * Provide a factory to create an alternate implementation of {@link PulsarMessageReceipt}.
+ * Provide a factory to create an alternate implementation of
+ * {@link PulsarMessageReceipt}.
*/
public void setPulsarMessageReceiptFactory(PulsarMessageReceiptFactory pulsarMessageReceiptFactory) {
this.pulsarMessageReceiptFactory = pulsarMessageReceiptFactory;
diff --git a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarEndpoint.java b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarEndpoint.java
index 0330677..5ab8bbe 100644
--- a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarEndpoint.java
+++ b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarEndpoint.java
@@ -34,13 +34,17 @@ public class PulsarEndpoint extends DefaultEndpoint {
private PulsarClient pulsarClient;
private String uri;
- @UriPath(enums = "persistent,non-persistent") @Metadata(required = true)
+ @UriPath(enums = "persistent,non-persistent")
+ @Metadata(required = true)
private String persistence;
- @UriPath @Metadata(required = true)
+ @UriPath
+ @Metadata(required = true)
private String tenant;
- @UriPath @Metadata(required = true)
+ @UriPath
+ @Metadata(required = true)
private String namespace;
- @UriPath @Metadata(required = true)
+ @UriPath
+ @Metadata(required = true)
private String topic;
@UriParam
private PulsarConfiguration pulsarConfiguration;
@@ -140,6 +144,6 @@ public class PulsarEndpoint extends DefaultEndpoint {
@Override
public PulsarComponent getComponent() {
- return (PulsarComponent) super.getComponent();
+ return (PulsarComponent)super.getComponent();
}
}
diff --git a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarMessageListener.java b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarMessageListener.java
index 55249fc..11e87fb 100644
--- a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarMessageListener.java
+++ b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarMessageListener.java
@@ -47,8 +47,7 @@ public class PulsarMessageListener implements MessageListener<byte[]> {
try {
if (endpoint.getPulsarConfiguration().isAllowManualAcknowledgement()) {
- exchange.getIn().setHeader(PulsarMessageHeaders.MESSAGE_RECEIPT,
- endpoint.getComponent().getPulsarMessageReceiptFactory().newInstance(exchange, message, consumer));
+ exchange.getIn().setHeader(PulsarMessageHeaders.MESSAGE_RECEIPT, endpoint.getComponent().getPulsarMessageReceiptFactory().newInstance(exchange, message, consumer));
processor.process(exchange);
} else {
processor.process(exchange);
@@ -60,8 +59,7 @@ public class PulsarMessageListener implements MessageListener<byte[]> {
}
private void handleProcessorException(final Exchange exchange, final Exception exception) {
- final Exchange exchangeWithException = PulsarMessageUtils
- .updateExchangeWithException(exception, exchange);
+ final Exchange exchangeWithException = PulsarMessageUtils.updateExchangeWithException(exception, exchange);
exceptionHandler.handleException("An error occurred", exchangeWithException, exception);
}
diff --git a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarMessageReceipt.java b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarMessageReceipt.java
index 7cdfd90..bb2f849 100644
--- a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarMessageReceipt.java
+++ b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarMessageReceipt.java
@@ -25,8 +25,10 @@ import org.apache.pulsar.client.api.PulsarClientException;
/**
* Acknowledge the receipt of a message using the Pulsar consumer.
* <p>
- * Available on the {@link Exchange} if {@link PulsarConfiguration#isAllowManualAcknowledgement()} is true.
- * An alternative to the default may be provided by implementing {@link PulsarMessageReceiptFactory}.
+ * Available on the {@link Exchange} if
+ * {@link PulsarConfiguration#isAllowManualAcknowledgement()} is true. An
+ * alternative to the default may be provided by implementing
+ * {@link PulsarMessageReceiptFactory}.
*/
public interface PulsarMessageReceipt {
@@ -38,7 +40,8 @@ public interface PulsarMessageReceipt {
void acknowledge() throws PulsarClientException;
/**
- * Acknowledge receipt of all of the messages in the stream up to and including this message synchronously.
+ * Acknowledge receipt of all of the messages in the stream up to and
+ * including this message synchronously.
*
* @see org.apache.pulsar.client.api.Consumer#acknowledgeCumulative(MessageId)
*/
@@ -52,7 +55,8 @@ public interface PulsarMessageReceipt {
CompletableFuture<Void> acknowledgeAsync();
/**
- * Acknowledge receipt of all of the messages in the stream up to and including this message asynchronously.
+ * Acknowledge receipt of all of the messages in the stream up to and
+ * including this message asynchronously.
*
* @see org.apache.pulsar.client.api.Consumer#acknowledgeCumulativeAsync(MessageId)
*/
@@ -62,9 +66,10 @@ public interface PulsarMessageReceipt {
* Acknowledge the failure to process this message.
*
* @see org.apache.pulsar.client.api.Consumer#negativeAcknowledge(MessageId)
- * Note: Available in Puslar 2.4.0. Implementations with earlier versions should return an {@link java.lang.UnsupportedOperationException}.
+ * Note: Available in Puslar 2.4.0. Implementations with earlier
+ * versions should return an
+ * {@link java.lang.UnsupportedOperationException}.
*/
void negativeAcknowledge();
}
-
diff --git a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarMessageReceiptFactory.java b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarMessageReceiptFactory.java
index 314ae95..0a09723 100644
--- a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarMessageReceiptFactory.java
+++ b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarMessageReceiptFactory.java
@@ -21,10 +21,13 @@ import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
/**
- * Factory to create a new {@link PulsarMessageReceipt} to store on the {@link Exchange}.
+ * Factory to create a new {@link PulsarMessageReceipt} to store on the
+ * {@link Exchange}.
* <p>
- * Implement this interface if an alternate implementation of {@link PulsarMessageReceipt} is required
- * as newer Pulsar clients may have acknowledgement functionality not yet supported by {@link DefaultPulsarMessageReceipt}.
+ * Implement this interface if an alternate implementation of
+ * {@link PulsarMessageReceipt} is required as newer Pulsar clients may have
+ * acknowledgement functionality not yet supported by
+ * {@link DefaultPulsarMessageReceipt}.
*/
public interface PulsarMessageReceiptFactory {
diff --git a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarProducer.java b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarProducer.java
index d7c8b0a..5503949 100644
--- a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarProducer.java
+++ b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarProducer.java
@@ -44,8 +44,7 @@ public class PulsarProducer extends DefaultProducer {
final Message message = exchange.getIn();
byte[] body;
try {
- body = exchange.getContext().getTypeConverter()
- .mandatoryConvertTo(byte[].class, exchange, message.getBody());
+ body = exchange.getContext().getTypeConverter().mandatoryConvertTo(byte[].class, exchange, message.getBody());
} catch (NoTypeConversionAvailableException | TypeConversionException exception) {
// fallback to try serialize the data
body = PulsarMessageUtils.serialize(message.getBody());
@@ -61,24 +60,15 @@ public class PulsarProducer extends DefaultProducer {
if (producerName == null) {
producerName = topicUri + "-" + Thread.currentThread().getId();
}
- final ProducerBuilder<byte[]> producerBuilder = pulsarEndpoint
- .getPulsarClient()
- .newProducer()
- .producerName(producerName)
- .topic(topicUri)
- .sendTimeout(configuration.getSendTimeoutMs(), TimeUnit.MILLISECONDS)
- .blockIfQueueFull(configuration.isBlockIfQueueFull())
- .maxPendingMessages(configuration.getMaxPendingMessages())
- .maxPendingMessagesAcrossPartitions(configuration.getMaxPendingMessagesAcrossPartitions())
- .batchingMaxPublishDelay(configuration.getBatchingMaxPublishDelayMicros(), TimeUnit.MICROSECONDS)
- .batchingMaxMessages(configuration.getMaxPendingMessages())
- .enableBatching(configuration.isBatchingEnabled())
- .initialSequenceId(configuration.getInitialSequenceId())
- .compressionType(configuration.getCompressionType());
+ final ProducerBuilder<byte[]> producerBuilder = pulsarEndpoint.getPulsarClient().newProducer().producerName(producerName).topic(topicUri)
+ .sendTimeout(configuration.getSendTimeoutMs(), TimeUnit.MILLISECONDS).blockIfQueueFull(configuration.isBlockIfQueueFull())
+ .maxPendingMessages(configuration.getMaxPendingMessages()).maxPendingMessagesAcrossPartitions(configuration.getMaxPendingMessagesAcrossPartitions())
+ .batchingMaxPublishDelay(configuration.getBatchingMaxPublishDelayMicros(), TimeUnit.MICROSECONDS).batchingMaxMessages(configuration.getMaxPendingMessages())
+ .enableBatching(configuration.isBatchingEnabled()).initialSequenceId(configuration.getInitialSequenceId()).compressionType(configuration.getCompressionType());
if (ObjectHelper.isNotEmpty(configuration.getMessageRouter())) {
producerBuilder.messageRouter(configuration.getMessageRouter());
} else {
- producerBuilder.messageRoutingMode(configuration.getMessageRoutingMode());
+ producerBuilder.messageRoutingMode(configuration.getMessageRoutingMode());
}
producer = producerBuilder.create();
}
diff --git a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/configuration/PulsarConfiguration.java b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/configuration/PulsarConfiguration.java
index ea34312..b60bfed 100644
--- a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/configuration/PulsarConfiguration.java
+++ b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/configuration/PulsarConfiguration.java
@@ -55,11 +55,10 @@ public class PulsarConfiguration {
private int sendTimeoutMs = 30000;
@UriParam(label = "producer", description = "Whether to block the producing thread if pending messages queue is full or to throw a ProducerQueueIsFullError", defaultValue = "false")
private boolean blockIfQueueFull;
- @UriParam(label = "producer", description = "Size of the pending massages queue. When the queue is full, by default, any further sends will fail unless blockIfQueueFull=true",
- defaultValue = "1000")
+ @UriParam(label = "producer", description = "Size of the pending massages queue. When the queue is full, by default, any further sends will fail unless blockIfQueueFull=true", defaultValue = "1000")
private int maxPendingMessages = 1000;
@UriParam(label = "producer", description = "The maximum number of pending messages for partitioned topics. The maxPendingMessages value will be reduced if "
- + "(number of partitions * maxPendingMessages) exceeds this value. Partitioned topics have a pending message queue for each partition.", defaultValue = "50000")
+ + "(number of partitions * maxPendingMessages) exceeds this value. Partitioned topics have a pending message queue for each partition.", defaultValue = "50000")
private int maxPendingMessagesAcrossPartitions = 50000;
@UriParam(label = "producer", description = "The maximum time period within which the messages sent will be batched if batchingEnabled is true.", defaultValue = "1000")
private long batchingMaxPublishDelayMicros = TimeUnit.MILLISECONDS.toMicros(1);
@@ -92,7 +91,8 @@ public class PulsarConfiguration {
}
/**
- * Type of the subscription [EXCLUSIVE|SHARED|FAILOVER], defaults to EXCLUSIVE
+ * Type of the subscription [EXCLUSIVE|SHARED|FAILOVER], defaults to
+ * EXCLUSIVE
*/
public void setSubscriptionType(SubscriptionType subscriptionType) {
this.subscriptionType = subscriptionType;
@@ -147,7 +147,8 @@ public class PulsarConfiguration {
}
/**
- * Prefix to add to consumer names when a SHARED or FAILOVER subscription is used
+ * Prefix to add to consumer names when a SHARED or FAILOVER subscription is
+ * used
*/
public void setConsumerNamePrefix(String consumerNamePrefix) {
this.consumerNamePrefix = consumerNamePrefix;
@@ -160,9 +161,12 @@ public class PulsarConfiguration {
/**
* Whether to allow manual message acknowledgements.
* <p/>
- * If this option is enabled, then messages are not immediately acknowledged after being consumed.
- * Instead, an instance of {@link PulsarMessageReceipt} is stored as a header on the {@link org.apache.camel.Exchange}.
- * Messages can then be acknowledged using {@link PulsarMessageReceipt} at any time before the ackTimeout occurs.
+ * If this option is enabled, then messages are not immediately acknowledged
+ * after being consumed. Instead, an instance of
+ * {@link PulsarMessageReceipt} is stored as a header on the
+ * {@link org.apache.camel.Exchange}. Messages can then be acknowledged
+ * using {@link PulsarMessageReceipt} at any time before the ackTimeout
+ * occurs.
*/
public void setAllowManualAcknowledgement(boolean allowManualAcknowledgement) {
this.allowManualAcknowledgement = allowManualAcknowledgement;
@@ -184,15 +188,15 @@ public class PulsarConfiguration {
}
/**
- * Group the consumer acknowledgments for the specified time in milliseconds - defaults to 100
+ * Group the consumer acknowledgments for the specified time in milliseconds
+ * - defaults to 100
*/
public void setAckGroupTimeMillis(long ackGroupTimeMillis) {
this.ackGroupTimeMillis = ackGroupTimeMillis;
}
/**
- * Send timeout in milliseconds.
- * Defaults to 30,000ms (30 seconds)
+ * Send timeout in milliseconds. Defaults to 30,000ms (30 seconds)
*/
public void setSendTimeoutMs(int sendTimeoutMs) {
this.sendTimeoutMs = sendTimeoutMs;
@@ -203,10 +207,10 @@ public class PulsarConfiguration {
}
/**
- * Set whether the send and asyncSend operations should block when the outgoing message queue is full.
- * If set to false, send operations will immediately fail with ProducerQueueIsFullError when there is no space left
- * in the pending queue.
- * Default is false.
+ * Set whether the send and asyncSend operations should block when the
+ * outgoing message queue is full. If set to false, send operations will
+ * immediately fail with ProducerQueueIsFullError when there is no space
+ * left in the pending queue. Default is false.
*/
public void setBlockIfQueueFull(boolean blockIfQueueFull) {
this.blockIfQueueFull = blockIfQueueFull;
@@ -217,8 +221,8 @@ public class PulsarConfiguration {
}
/**
- * Set the max size of the queue holding the messages pending to receive an acknowledgment from the broker.
- * Default is 1000.
+ * Set the max size of the queue holding the messages pending to receive an
+ * acknowledgment from the broker. Default is 1000.
*/
public void setMaxPendingMessages(int maxPendingMessages) {
this.maxPendingMessages = maxPendingMessages;
@@ -229,8 +233,8 @@ public class PulsarConfiguration {
}
/**
- * Set the number of max pending messages across all the partitions.
- * Default is 50000.
+ * Set the number of max pending messages across all the partitions. Default
+ * is 50000.
*/
public void setMaxPendingMessagesAcrossPartitions(int maxPendingMessagesAcrossPartitions) {
this.maxPendingMessagesAcrossPartitions = maxPendingMessagesAcrossPartitions;
@@ -241,11 +245,12 @@ public class PulsarConfiguration {
}
/**
- * Set the time period within which the messages sent will be batched if batch messages are
- * enabled. If set to a non zero value, messages will be queued until either:
+ * Set the time period within which the messages sent will be batched if
+ * batch messages are enabled. If set to a non zero value, messages will be
+ * queued until either:
* <ul>
- * <li>this time interval expires</li>
- * <li>the max number of messages in a batch is reached
+ * <li>this time interval expires</li>
+ * <li>the max number of messages in a batch is reached
* </ul>
* Default is 1ms.
*/
@@ -258,8 +263,7 @@ public class PulsarConfiguration {
}
/**
- * Set the maximum number of messages permitted in a batch.
- * Default 1,000.
+ * Set the maximum number of messages permitted in a batch. Default 1,000.
*/
public void setBatchingMaxMessages(int batchingMaxMessages) {
this.batchingMaxMessages = batchingMaxMessages;
@@ -270,8 +274,8 @@ public class PulsarConfiguration {
}
/**
- * Control whether automatic batching of messages is enabled for the producer.
- * Default is true.
+ * Control whether automatic batching of messages is enabled for the
+ * producer. Default is true.
*/
public void setBatchingEnabled(boolean batchingEnabled) {
this.batchingEnabled = batchingEnabled;
@@ -282,9 +286,10 @@ public class PulsarConfiguration {
}
/**
- * Set the baseline for the sequence ids for messages published by the producer.
- * First message will be using (initialSequenceId 1) as its sequence id and subsequent messages will be assigned
- * incremental sequence ids, if not otherwise specified.
+ * Set the baseline for the sequence ids for messages published by the
+ * producer. First message will be using (initialSequenceId 1) as its
+ * sequence id and subsequent messages will be assigned incremental sequence
+ * ids, if not otherwise specified.
*/
public void setInitialSequenceId(long initialSequenceId) {
this.initialSequenceId = initialSequenceId;
@@ -315,24 +320,23 @@ public class PulsarConfiguration {
/**
* Set the message routing mode for the producer.
*/
- public MessageRoutingMode getMessageRoutingMode() {
- return messageRoutingMode;
- }
+ public MessageRoutingMode getMessageRoutingMode() {
+ return messageRoutingMode;
+ }
- public void setMessageRoutingMode(MessageRoutingMode messageRoutingMode) {
- this.messageRoutingMode = messageRoutingMode;
- }
+ public void setMessageRoutingMode(MessageRoutingMode messageRoutingMode) {
+ this.messageRoutingMode = messageRoutingMode;
+ }
/**
* Set a custom Message Router.
*/
- public MessageRouter getMessageRouter() {
- return messageRouter;
- }
-
- public void setMessageRouter(MessageRouter messageRouter) {
- this.messageRouter = messageRouter;
- }
-
+ public MessageRouter getMessageRouter() {
+ return messageRouter;
+ }
+
+ public void setMessageRouter(MessageRouter messageRouter) {
+ this.messageRouter = messageRouter;
+ }
}
diff --git a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/PulsarPath.java b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/PulsarPath.java
index 1af739f..eb3b63e 100644
--- a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/PulsarPath.java
+++ b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/PulsarPath.java
@@ -58,4 +58,4 @@ public class PulsarPath {
public boolean isAutoConfigurable() {
return autoConfigurable;
}
-}
\ No newline at end of file
+}
diff --git a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/consumers/CommonCreationStrategyImpl.java b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/consumers/CommonCreationStrategyImpl.java
index a96e369..de2ff9c 100644
--- a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/consumers/CommonCreationStrategyImpl.java
+++ b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/consumers/CommonCreationStrategyImpl.java
@@ -24,7 +24,7 @@ import org.apache.camel.component.pulsar.configuration.PulsarConfiguration;
import org.apache.pulsar.client.api.ConsumerBuilder;
public final class CommonCreationStrategyImpl {
-
+
private CommonCreationStrategyImpl() {
}
@@ -32,8 +32,7 @@ public final class CommonCreationStrategyImpl {
final PulsarConfiguration endpointConfiguration = pulsarEndpoint.getPulsarConfiguration();
return pulsarEndpoint.getPulsarClient().newConsumer().topic(pulsarEndpoint.getUri()).subscriptionName(endpointConfiguration.getSubscriptionName())
- .receiverQueueSize(endpointConfiguration.getConsumerQueueSize()).consumerName(name)
- .ackTimeout(endpointConfiguration.getAckTimeoutMillis(), TimeUnit.MILLISECONDS)
+ .receiverQueueSize(endpointConfiguration.getConsumerQueueSize()).consumerName(name).ackTimeout(endpointConfiguration.getAckTimeoutMillis(), TimeUnit.MILLISECONDS)
.acknowledgmentGroupTime(endpointConfiguration.getAckGroupTimeMillis(), TimeUnit.MILLISECONDS)
.messageListener(new PulsarMessageListener(pulsarEndpoint, pulsarConsumer.getExceptionHandler(), pulsarConsumer.getProcessor()));
}
diff --git a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/message/PulsarMessageUtils.java b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/message/PulsarMessageUtils.java
index 8332c2a..860337f 100644
--- a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/message/PulsarMessageUtils.java
+++ b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/message/PulsarMessageUtils.java
@@ -35,7 +35,7 @@ import static org.apache.camel.component.pulsar.utils.message.PulsarMessageHeade
import static org.apache.camel.component.pulsar.utils.message.PulsarMessageHeaders.TOPIC_NAME;
public final class PulsarMessageUtils {
-
+
private PulsarMessageUtils() {
}
diff --git a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarConcurrentConsumerInTest.java b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarConcurrentConsumerInTest.java
index a4bb747..c859d30 100644
--- a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarConcurrentConsumerInTest.java
+++ b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarConcurrentConsumerInTest.java
@@ -45,7 +45,7 @@ public class PulsarConcurrentConsumerInTest extends PulsarTestSupport {
private static final int NUMBER_OF_CONSUMERS = 5;
@EndpointInject("pulsar:" + TOPIC_URI + "?numberOfConsumers=5&subscriptionType=Shared"
- + "&subscriptionName=camel-subscription&consumerQueueSize=1&consumerNamePrefix=camel-consumer-")
+ + "&subscriptionName=camel-subscription&consumerQueueSize=1&consumerNamePrefix=camel-consumer-")
private Endpoint from;
@EndpointInject("mock:result")
@@ -72,7 +72,7 @@ public class PulsarConcurrentConsumerInTest extends PulsarTestSupport {
@Override
protected Registry createCamelRegistry() throws Exception {
SimpleRegistry registry = new SimpleRegistry();
-
+
registerPulsarBeans(registry);
return registry;
diff --git a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarConcurrentProducerInTest.java b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarConcurrentProducerInTest.java
index d6aac83..d3b4622 100644
--- a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarConcurrentProducerInTest.java
+++ b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarConcurrentProducerInTest.java
@@ -43,7 +43,7 @@ public class PulsarConcurrentProducerInTest extends PulsarTestSupport {
private ProducerTemplate producerTemplate;
@EndpointInject("pulsar:" + TOPIC_URI + "?numberOfConsumers=3&subscriptionType=Shared" + "&subscriptionName=camel-subscription&consumerQueueSize=5"
- + "&consumerNamePrefix=camel-consumer" + "&producerName=" + PRODUCER)
+ + "&consumerNamePrefix=camel-consumer" + "&producerName=" + PRODUCER)
private Endpoint from;
@EndpointInject("mock:result")
@@ -64,7 +64,7 @@ public class PulsarConcurrentProducerInTest extends PulsarTestSupport {
@Override
protected Registry createCamelRegistry() throws Exception {
SimpleRegistry registry = new SimpleRegistry();
-
+
registerPulsarBeans(registry);
return registry;
diff --git a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarConsumerAcknowledgementTest.java b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarConsumerAcknowledgementTest.java
index 0ce2dcc..e3195b3 100644
--- a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarConsumerAcknowledgementTest.java
+++ b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarConsumerAcknowledgementTest.java
@@ -42,12 +42,8 @@ public class PulsarConsumerAcknowledgementTest extends PulsarTestSupport {
private static final String TOPIC_URI = "persistent://public/default/camel-topic";
private static final String PRODUCER = "camel-producer-1";
- @EndpointInject(uri = "pulsar:" + TOPIC_URI
- + "?numberOfConsumers=1&subscriptionType=Exclusive"
- + "&subscriptionName=camel-subscription&consumerQueueSize=1&consumerName=camel-consumer"
- + "&allowManualAcknowledgement=true"
- + "&ackTimeoutMillis=1000"
- )
+ @EndpointInject(uri = "pulsar:" + TOPIC_URI + "?numberOfConsumers=1&subscriptionType=Exclusive"
+ + "&subscriptionName=camel-subscription&consumerQueueSize=1&consumerName=camel-consumer" + "&allowManualAcknowledgement=true" + "&ackTimeoutMillis=1000")
private Endpoint from;
@EndpointInject(uri = "mock:result")
@@ -58,10 +54,7 @@ public class PulsarConsumerAcknowledgementTest extends PulsarTestSupport {
@Before
public void setup() throws Exception {
context.removeRoute("myRoute");
- producer = givenPulsarClient().newProducer(Schema.STRING)
- .producerName(PRODUCER)
- .topic(TOPIC_URI)
- .create();
+ producer = givenPulsarClient().newProducer(Schema.STRING).producerName(PRODUCER).topic(TOPIC_URI).create();
}
@Override
@@ -85,11 +78,7 @@ public class PulsarConsumerAcknowledgementTest extends PulsarTestSupport {
}
private PulsarClient givenPulsarClient() throws PulsarClientException {
- return new ClientBuilderImpl()
- .serviceUrl(getPulsarBrokerUrl())
- .ioThreads(1)
- .listenerThreads(1)
- .build();
+ return new ClientBuilderImpl().serviceUrl(getPulsarBrokerUrl()).ioThreads(1).listenerThreads(1).build();
}
@Test
@@ -102,8 +91,7 @@ public class PulsarConsumerAcknowledgementTest extends PulsarTestSupport {
from(from).routeId("myRoute").to(to).process(exchange -> {
LOGGER.info("Processing message {}", exchange.getIn().getBody());
- PulsarMessageReceipt receipt = (PulsarMessageReceipt) exchange.getIn()
- .getHeader(PulsarMessageHeaders.MESSAGE_RECEIPT);
+ PulsarMessageReceipt receipt = (PulsarMessageReceipt)exchange.getIn().getHeader(PulsarMessageHeaders.MESSAGE_RECEIPT);
receipt.acknowledge();
});
}
@@ -124,8 +112,7 @@ public class PulsarConsumerAcknowledgementTest extends PulsarTestSupport {
from(from).routeId("myRoute").to(to).process(exchange -> {
LOGGER.info("Processing message {}", exchange.getIn().getBody());
- PulsarMessageReceipt receipt = (PulsarMessageReceipt) exchange.getIn()
- .getHeader(PulsarMessageHeaders.MESSAGE_RECEIPT);
+ PulsarMessageReceipt receipt = (PulsarMessageReceipt)exchange.getIn().getHeader(PulsarMessageHeaders.MESSAGE_RECEIPT);
try {
CompletableFuture<Void> f = receipt.acknowledgeAsync();
f.get();
@@ -151,8 +138,7 @@ public class PulsarConsumerAcknowledgementTest extends PulsarTestSupport {
from(from).routeId("myRoute").to(to).process(exchange -> {
LOGGER.info("Processing message {}", exchange.getIn().getBody());
- PulsarMessageReceipt receipt = (PulsarMessageReceipt) exchange.getIn()
- .getHeader(PulsarMessageHeaders.MESSAGE_RECEIPT);
+ PulsarMessageReceipt receipt = (PulsarMessageReceipt)exchange.getIn().getHeader(PulsarMessageHeaders.MESSAGE_RECEIPT);
// Ack the second message. The first will also be acked.
if (exchange.getIn().getBody().equals("Hello World Again!")) {
receipt.acknowledgeCumulative();
@@ -177,8 +163,7 @@ public class PulsarConsumerAcknowledgementTest extends PulsarTestSupport {
from(from).routeId("myRoute").to(to).process(exchange -> {
LOGGER.info("Processing message {}", exchange.getIn().getBody());
- PulsarMessageReceipt receipt = (PulsarMessageReceipt) exchange.getIn()
- .getHeader(PulsarMessageHeaders.MESSAGE_RECEIPT);
+ PulsarMessageReceipt receipt = (PulsarMessageReceipt)exchange.getIn().getHeader(PulsarMessageHeaders.MESSAGE_RECEIPT);
// Ack the second message. The first will also be acked.
if (exchange.getIn().getBody().equals("Hello World Again!")) {
try {
diff --git a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarConsumerInTest.java b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarConsumerInTest.java
index 2590290..8ff9e91 100644
--- a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarConsumerInTest.java
+++ b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarConsumerInTest.java
@@ -43,10 +43,8 @@ public class PulsarConsumerInTest extends PulsarTestSupport {
private static final String TOPIC_URI = "persistent://public/default/camel-topic";
private static final String PRODUCER = "camel-producer-1";
- @EndpointInject("pulsar:" + TOPIC_URI
- + "?numberOfConsumers=1&subscriptionType=Exclusive"
- + "&subscriptionName=camel-subscription&consumerQueueSize=1&consumerName=camel-consumer"
- )
+ @EndpointInject("pulsar:" + TOPIC_URI + "?numberOfConsumers=1&subscriptionType=Exclusive"
+ + "&subscriptionName=camel-subscription&consumerQueueSize=1&consumerName=camel-consumer")
private Endpoint from;
@EndpointInject("mock:result")
@@ -73,7 +71,7 @@ public class PulsarConsumerInTest extends PulsarTestSupport {
@Override
protected Registry createCamelRegistry() throws Exception {
SimpleRegistry registry = new SimpleRegistry();
-
+
registerPulsarBeans(registry);
return registry;
@@ -92,21 +90,14 @@ public class PulsarConsumerInTest extends PulsarTestSupport {
}
private PulsarClient givenPulsarClient() throws PulsarClientException {
- return new ClientBuilderImpl()
- .serviceUrl(getPulsarBrokerUrl())
- .ioThreads(1)
- .listenerThreads(1)
- .build();
+ return new ClientBuilderImpl().serviceUrl(getPulsarBrokerUrl()).ioThreads(1).listenerThreads(1).build();
}
@Test
public void testAMessageToClusterIsConsumed() throws Exception {
to.expectedMessageCount(1);
- Producer<String> producer = givenPulsarClient().newProducer(Schema.STRING)
- .producerName(PRODUCER)
- .topic(TOPIC_URI)
- .create();
+ Producer<String> producer = givenPulsarClient().newProducer(Schema.STRING).producerName(PRODUCER).topic(TOPIC_URI).create();
producer.send("Hello World!");
diff --git a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarConsumerNoAcknowledgementTest.java b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarConsumerNoAcknowledgementTest.java
index 1cb5527..f57b4a5 100644
--- a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarConsumerNoAcknowledgementTest.java
+++ b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarConsumerNoAcknowledgementTest.java
@@ -36,11 +36,8 @@ public class PulsarConsumerNoAcknowledgementTest extends PulsarTestSupport {
private static final String TOPIC_URI = "persistent://public/default/camel-topic";
private static final String PRODUCER = "camel-producer-1";
- @EndpointInject(uri = "pulsar:" + TOPIC_URI
- + "?numberOfConsumers=1&subscriptionType=Exclusive"
- + "&subscriptionName=camel-subscription&consumerQueueSize=1&consumerName=camel-consumer"
- + "&ackTimeoutMillis=1000"
- )
+ @EndpointInject(uri = "pulsar:" + TOPIC_URI + "?numberOfConsumers=1&subscriptionType=Exclusive"
+ + "&subscriptionName=camel-subscription&consumerQueueSize=1&consumerName=camel-consumer" + "&ackTimeoutMillis=1000")
private Endpoint from;
@EndpointInject(uri = "mock:result")
@@ -74,26 +71,21 @@ public class PulsarConsumerNoAcknowledgementTest extends PulsarTestSupport {
PulsarComponent comp = new PulsarComponent(context);
comp.setAutoConfiguration(autoConfiguration);
comp.setPulsarClient(pulsarClient);
- comp.setAllowManualAcknowledgement(true); // Set to true here instead of the endpoint query parameter.
+ comp.setAllowManualAcknowledgement(true); // Set to true here instead of
+ // the endpoint query
+ // parameter.
jndi.bind("pulsar", comp);
}
private PulsarClient givenPulsarClient() throws PulsarClientException {
- return new ClientBuilderImpl()
- .serviceUrl(getPulsarBrokerUrl())
- .ioThreads(1)
- .listenerThreads(1)
- .build();
+ return new ClientBuilderImpl().serviceUrl(getPulsarBrokerUrl()).ioThreads(1).listenerThreads(1).build();
}
@Test
public void testAMessageIsConsumedMultipleTimes() throws Exception {
to.expectedMinimumMessageCount(2);
- Producer<String> producer = givenPulsarClient().newProducer(Schema.STRING)
- .producerName(PRODUCER)
- .topic(TOPIC_URI)
- .create();
+ Producer<String> producer = givenPulsarClient().newProducer(Schema.STRING).producerName(PRODUCER).topic(TOPIC_URI).create();
producer.send("Hello World!");
diff --git a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarCustomMessageReceiptTest.java b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarCustomMessageReceiptTest.java
index 24a9132..eea202e 100644
--- a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarCustomMessageReceiptTest.java
+++ b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarCustomMessageReceiptTest.java
@@ -53,12 +53,8 @@ public class PulsarCustomMessageReceiptTest extends PulsarTestSupport {
public PulsarMessageReceipt mockPulsarMessageReceipt = mock(PulsarMessageReceipt.class);
- @EndpointInject(uri = "pulsar:" + TOPIC_URI
- + "?numberOfConsumers=1&subscriptionType=Exclusive"
- + "&subscriptionName=camel-subscription&consumerQueueSize=1&consumerName=camel-consumer"
- + "&allowManualAcknowledgement=true"
- + "&ackTimeoutMillis=1000"
- )
+ @EndpointInject(uri = "pulsar:" + TOPIC_URI + "?numberOfConsumers=1&subscriptionType=Exclusive"
+ + "&subscriptionName=camel-subscription&consumerQueueSize=1&consumerName=camel-consumer" + "&allowManualAcknowledgement=true" + "&ackTimeoutMillis=1000")
private Endpoint from;
@EndpointInject(uri = "mock:result")
@@ -68,10 +64,7 @@ public class PulsarCustomMessageReceiptTest extends PulsarTestSupport {
@Before
public void setup() throws Exception {
- producer = givenPulsarClient().newProducer(Schema.STRING)
- .producerName(PRODUCER)
- .topic(TOPIC_URI)
- .create();
+ producer = givenPulsarClient().newProducer(Schema.STRING).producerName(PRODUCER).topic(TOPIC_URI).create();
}
@Override
@@ -97,11 +90,7 @@ public class PulsarCustomMessageReceiptTest extends PulsarTestSupport {
}
private PulsarClient givenPulsarClient() throws PulsarClientException {
- return new ClientBuilderImpl()
- .serviceUrl(getPulsarBrokerUrl())
- .ioThreads(1)
- .listenerThreads(1)
- .build();
+ return new ClientBuilderImpl().serviceUrl(getPulsarBrokerUrl()).ioThreads(1).listenerThreads(1).build();
}
@Test
@@ -116,8 +105,7 @@ public class PulsarCustomMessageReceiptTest extends PulsarTestSupport {
from(from).routeId("myRoute").to(to).process(exchange -> {
LOGGER.info("Processing message {}", exchange.getIn().getBody());
- PulsarMessageReceipt receipt = (PulsarMessageReceipt) exchange.getIn()
- .getHeader(PulsarMessageHeaders.MESSAGE_RECEIPT);
+ PulsarMessageReceipt receipt = (PulsarMessageReceipt)exchange.getIn().getHeader(PulsarMessageHeaders.MESSAGE_RECEIPT);
receipt.acknowledge();
});
}
@@ -127,10 +115,11 @@ public class PulsarCustomMessageReceiptTest extends PulsarTestSupport {
MockEndpoint.assertIsSatisfied(10, TimeUnit.SECONDS, to);
- // The mock does not actually acknowledge using the pulsar consumer, so the message will be re-consumed
+ // The mock does not actually acknowledge using the pulsar consumer, so
+ // the message will be re-consumed
// after the ackTimeout.
verify(mockPulsarMessageReceipt, atLeast(2)).acknowledge();
verifyNoMoreInteractions(mockPulsarMessageReceipt);
}
-
+
}
diff --git a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarProducerInTest.java b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarProducerInTest.java
index c66ca1c..52b274e 100644
--- a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarProducerInTest.java
+++ b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarProducerInTest.java
@@ -40,12 +40,8 @@ public class PulsarProducerInTest extends PulsarTestSupport {
@Produce("direct:start")
private ProducerTemplate producerTemplate;
- @EndpointInject("pulsar:" + TOPIC_URI
- + "?numberOfConsumers=1&subscriptionType=Exclusive"
- + "&subscriptionName=camel-subscription&consumerQueueSize=1"
- + "&consumerName=camel-consumer"
- + "&producerName=" + PRODUCER
- )
+ @EndpointInject("pulsar:" + TOPIC_URI + "?numberOfConsumers=1&subscriptionType=Exclusive" + "&subscriptionName=camel-subscription&consumerQueueSize=1"
+ + "&consumerName=camel-consumer" + "&producerName=" + PRODUCER)
private Endpoint from;
@EndpointInject("mock:result")
@@ -66,7 +62,7 @@ public class PulsarProducerInTest extends PulsarTestSupport {
@Override
protected Registry createCamelRegistry() throws Exception {
SimpleRegistry registry = new SimpleRegistry();
-
+
registerPulsarBeans(registry);
return registry;
@@ -85,11 +81,7 @@ public class PulsarProducerInTest extends PulsarTestSupport {
}
private PulsarClient givenPulsarClient() throws PulsarClientException {
- return new ClientBuilderImpl()
- .serviceUrl(getPulsarBrokerUrl())
- .ioThreads(1)
- .listenerThreads(1)
- .build();
+ return new ClientBuilderImpl().serviceUrl(getPulsarBrokerUrl()).ioThreads(1).listenerThreads(1).build();
}
@Test
diff --git a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarTestSupport.java b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarTestSupport.java
index c49f46a..03124db 100644
--- a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarTestSupport.java
+++ b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarTestSupport.java
@@ -34,9 +34,7 @@ public class PulsarTestSupport extends ContainerAwareTestSupport {
}
public static GenericContainer pulsarContainer() {
- return new GenericContainer(CONTAINER_IMAGE)
- .withNetworkAliases(CONTAINER_NAME)
- .withExposedPorts(BROKER_PORT, BROKER_HTTP_PORT)
+ return new GenericContainer(CONTAINER_IMAGE).withNetworkAliases(CONTAINER_NAME).withExposedPorts(BROKER_PORT, BROKER_HTTP_PORT)
.withCommand("/pulsar/bin/pulsar", "standalone", "--no-functions-worker", "-nss")
.waitingFor(Wait.forHttp(WAIT_FOR_ENDPOINT).forStatusCode(200).forPort(BROKER_HTTP_PORT));
}
diff --git a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/utils/PulsarUtilsTest.java b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/utils/PulsarUtilsTest.java
index b93ba10..b4c9dad 100644
--- a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/utils/PulsarUtilsTest.java
+++ b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/utils/PulsarUtilsTest.java
@@ -66,8 +66,8 @@ public class PulsarUtilsTest {
doThrow(new PulsarClientException("A Pulsar Client exception occurred")).when(consumer).close();
consumer.close();
-
+
verify(consumer).unsubscribe();
verify(consumer).close();
}
-}
\ No newline at end of file
+}