You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2019/05/24 12:52:41 UTC
[camel] 02/06: CAMEL-13570 - Fixed CS
This is an automated email from the ASF dual-hosted git repository.
acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git
commit 9b3fec8ee3a5c6e4c27732936371d9d86bf886b3
Author: Andrea Cosentino <an...@gmail.com>
AuthorDate: Fri May 24 14:02:27 2019 +0200
CAMEL-13570 - Fixed CS
---
.../camel/component/aws/sqs/SqsConfiguration.java | 105 ++++++++++++---------
.../camel/component/aws/sqs/SqsConsumer.java | 48 +++++-----
2 files changed, 85 insertions(+), 68 deletions(-)
diff --git a/components/camel-aws-sqs/src/main/java/org/apache/camel/component/aws/sqs/SqsConfiguration.java b/components/camel-aws-sqs/src/main/java/org/apache/camel/component/aws/sqs/SqsConfiguration.java
index 3a17d7a..4305a01 100644
--- a/components/camel-aws-sqs/src/main/java/org/apache/camel/component/aws/sqs/SqsConfiguration.java
+++ b/components/camel-aws-sqs/src/main/java/org/apache/camel/component/aws/sqs/SqsConfiguration.java
@@ -96,13 +96,13 @@ public class SqsConfiguration implements Cloneable {
private Integer receiveMessageWaitTimeSeconds;
@UriParam(label = "queue")
private String policy;
-
+
// dead letter queue properties
@UriParam(label = "queue")
private String redrivePolicy;
/**
- * Whether or not the queue is a FIFO queue
+ * Whether or not the queue is a FIFO queue
*/
boolean isFifoQueue() {
// AWS docs suggest this is valid derivation.
@@ -184,10 +184,12 @@ public class SqsConfiguration implements Cloneable {
}
/**
- * The duration (in seconds) that the received messages are hidden from subsequent retrieve requests after being retrieved
- * by a ReceiveMessage request to set in the com.amazonaws.services.sqs.model.SetQueueAttributesRequest.
- * This only make sense if its different from defaultVisibilityTimeout.
- * It changes the queue visibility timeout attribute permanently.
+ * The duration (in seconds) that the received messages are hidden from
+ * subsequent retrieve requests after being retrieved by a ReceiveMessage
+ * request to set in the
+ * com.amazonaws.services.sqs.model.SetQueueAttributesRequest. This only
+ * make sense if its different from defaultVisibilityTimeout. It changes the
+ * queue visibility timeout attribute permanently.
*/
public void setVisibilityTimeout(Integer visibilityTimeout) {
this.visibilityTimeout = visibilityTimeout;
@@ -198,7 +200,8 @@ public class SqsConfiguration implements Cloneable {
}
/**
- * A list of attribute names to receive when consuming. Multiple names can be separated by comma.
+ * A list of attribute names to receive when consuming. Multiple names can
+ * be separated by comma.
*/
public void setAttributeNames(String attributeNames) {
this.attributeNames = attributeNames;
@@ -209,7 +212,8 @@ public class SqsConfiguration implements Cloneable {
}
/**
- * A list of message attribute names to receive when consuming. Multiple names can be separated by comma.
+ * A list of message attribute names to receive when consuming. Multiple
+ * names can be separated by comma.
*/
public void setMessageAttributeNames(String messageAttributeNames) {
this.messageAttributeNames = messageAttributeNames;
@@ -242,7 +246,8 @@ public class SqsConfiguration implements Cloneable {
}
/**
- * Define if you want to apply delaySeconds option to the queue or on single messages
+ * Define if you want to apply delaySeconds option to the queue or on single
+ * messages
*/
public void setDelayQueue(boolean delayQueue) {
this.delayQueue = delayQueue;
@@ -253,7 +258,8 @@ public class SqsConfiguration implements Cloneable {
}
/**
- * The maximumMessageSize (in bytes) an SQS message can contain for this queue.
+ * The maximumMessageSize (in bytes) an SQS message can contain for this
+ * queue.
*/
public void setMaximumMessageSize(Integer maximumMessageSize) {
this.maximumMessageSize = maximumMessageSize;
@@ -264,7 +270,8 @@ public class SqsConfiguration implements Cloneable {
}
/**
- * The messageRetentionPeriod (in seconds) a message will be retained by SQS for this queue.
+ * The messageRetentionPeriod (in seconds) a message will be retained by SQS
+ * for this queue.
*/
public void setMessageRetentionPeriod(Integer messageRetentionPeriod) {
this.messageRetentionPeriod = messageRetentionPeriod;
@@ -286,7 +293,8 @@ public class SqsConfiguration implements Cloneable {
}
/**
- * Specify the policy that send message to DeadLetter queue. See detail at Amazon docs.
+ * Specify the policy that send message to DeadLetter queue. See detail at
+ * Amazon docs.
*/
public void setRedrivePolicy(String redrivePolicy) {
this.redrivePolicy = redrivePolicy;
@@ -297,8 +305,9 @@ public class SqsConfiguration implements Cloneable {
}
/**
- * If enabled then a scheduled background task will keep extending the message visibility on SQS.
- * This is needed if it takes a long time to process the message. If set to true defaultVisibilityTimeout must be set.
+ * If enabled then a scheduled background task will keep extending the
+ * message visibility on SQS. This is needed if it takes a long time to
+ * process the message. If set to true defaultVisibilityTimeout must be set.
* See details at Amazon docs.
*/
public void setExtendMessageVisibility(boolean extendMessageVisibility) {
@@ -310,7 +319,8 @@ public class SqsConfiguration implements Cloneable {
}
/**
- * If you do not specify WaitTimeSeconds in the request, the queue attribute ReceiveMessageWaitTimeSeconds is used to determine how long to wait.
+ * If you do not specify WaitTimeSeconds in the request, the queue attribute
+ * ReceiveMessageWaitTimeSeconds is used to determine how long to wait.
*/
public void setReceiveMessageWaitTimeSeconds(Integer receiveMessageWaitTimeSeconds) {
this.receiveMessageWaitTimeSeconds = receiveMessageWaitTimeSeconds;
@@ -321,7 +331,8 @@ public class SqsConfiguration implements Cloneable {
}
/**
- * Duration in seconds (0 to 20) that the ReceiveMessage action call will wait until a message is in the queue to include in the response.
+ * Duration in seconds (0 to 20) that the ReceiveMessage action call will
+ * wait until a message is in the queue to include in the response.
*/
public void setWaitTimeSeconds(Integer waitTimeSeconds) {
this.waitTimeSeconds = waitTimeSeconds;
@@ -332,7 +343,8 @@ public class SqsConfiguration implements Cloneable {
}
/**
- * Specify the queue owner aws account id when you need to connect the queue with different account owner.
+ * Specify the queue owner aws account id when you need to connect the queue
+ * with different account owner.
*/
public void setQueueOwnerAWSAccountId(String queueOwnerAWSAccountId) {
this.queueOwnerAWSAccountId = queueOwnerAWSAccountId;
@@ -343,8 +355,10 @@ public class SqsConfiguration implements Cloneable {
}
/**
- * Whether or not to send the DeleteMessage to the SQS queue if an exchange fails to get through a filter.
- * If 'false' and exchange does not make it through a Camel filter upstream in the route, then don't send DeleteMessage.
+ * Whether or not to send the DeleteMessage to the SQS queue if an exchange
+ * fails to get through a filter. If 'false' and exchange does not make it
+ * through a Camel filter upstream in the route, then don't send
+ * DeleteMessage.
*/
public void setDeleteIfFiltered(boolean deleteIfFiltered) {
this.deleteIfFiltered = deleteIfFiltered;
@@ -355,7 +369,8 @@ public class SqsConfiguration implements Cloneable {
}
/**
- * Specify the queue region which could be used with queueOwnerAWSAccountId to build the service URL.
+ * Specify the queue region which could be used with queueOwnerAWSAccountId
+ * to build the service URL.
*/
public void setRegion(String region) {
this.region = region;
@@ -366,7 +381,8 @@ public class SqsConfiguration implements Cloneable {
}
/**
- * Allows you to use multiple threads to poll the sqs queue to increase throughput
+ * Allows you to use multiple threads to poll the sqs queue to increase
+ * throughput
*/
public void setConcurrentConsumers(int concurrentConsumers) {
this.concurrentConsumers = concurrentConsumers;
@@ -377,8 +393,9 @@ public class SqsConfiguration implements Cloneable {
}
/**
- * To define the queueUrl explicitly. All other parameters, which would influence the queueUrl, are ignored.
- * This parameter is intended to be used, to connect to a mock implementation of SQS, for testing purposes.
+ * To define the queueUrl explicitly. All other parameters, which would
+ * influence the queueUrl, are ignored. This parameter is intended to be
+ * used, to connect to a mock implementation of SQS, for testing purposes.
*/
public void setQueueUrl(String queueUrl) {
this.queueUrl = queueUrl;
@@ -411,7 +428,8 @@ public class SqsConfiguration implements Cloneable {
}
/**
- * The ID of an AWS-managed customer master key (CMK) for Amazon SQS or a custom CMK.
+ * The ID of an AWS-managed customer master key (CMK) for Amazon SQS or a
+ * custom CMK.
*/
public void setKmsMasterKeyId(String kmsMasterKeyId) {
this.kmsMasterKeyId = kmsMasterKeyId;
@@ -422,9 +440,10 @@ public class SqsConfiguration implements Cloneable {
}
/**
- * The length of time, in seconds, for which Amazon SQS can reuse a data key to encrypt or decrypt
- * messages before calling AWS KMS again. An integer representing seconds, between 60 seconds (1 minute)
- * and 86,400 seconds (24 hours). Default: 300 (5 minutes).
+ * The length of time, in seconds, for which Amazon SQS can reuse a data key
+ * to encrypt or decrypt messages before calling AWS KMS again. An integer
+ * representing seconds, between 60 seconds (1 minute) and 86,400 seconds
+ * (24 hours). Default: 300 (5 minutes).
*/
public void setKmsDataKeyReusePeriodSeconds(Integer kmsDataKeyReusePeriodSeconds) {
this.kmsDataKeyReusePeriodSeconds = kmsDataKeyReusePeriodSeconds;
@@ -442,9 +461,10 @@ public class SqsConfiguration implements Cloneable {
}
/**
- * Only for FIFO queues. Strategy for setting the messageGroupId on the message.
- * Can be one of the following options: *useConstant*, *useExchangeId*, *usePropertyValue*.
- * For the *usePropertyValue* option, the value of property "CamelAwsMessageGroupId" will be used.
+ * Only for FIFO queues. Strategy for setting the messageGroupId on the
+ * message. Can be one of the following options: *useConstant*,
+ * *useExchangeId*, *usePropertyValue*. For the *usePropertyValue* option,
+ * the value of property "CamelAwsMessageGroupId" will be used.
*/
public void setMessageGroupIdStrategy(String strategy) {
if ("useConstant".equalsIgnoreCase(strategy)) {
@@ -467,9 +487,10 @@ public class SqsConfiguration implements Cloneable {
}
/**
- * Only for FIFO queues. Strategy for setting the messageDeduplicationId on the message.
- * Can be one of the following options: *useExchangeId*, *useContentBasedDeduplication*.
- * For the *useContentBasedDeduplication* option, no messageDeduplicationId will be set on the message.
+ * Only for FIFO queues. Strategy for setting the messageDeduplicationId on
+ * the message. Can be one of the following options: *useExchangeId*,
+ * *useContentBasedDeduplication*. For the *useContentBasedDeduplication*
+ * option, no messageDeduplicationId will be set on the message.
*/
public void setMessageDeduplicationIdStrategy(String strategy) {
if ("useExchangeId".equalsIgnoreCase(strategy)) {
@@ -480,7 +501,7 @@ public class SqsConfiguration implements Cloneable {
throw new IllegalArgumentException("Unrecognised MessageDeduplicationIdStrategy: " + strategy);
}
}
-
+
public SqsOperations getOperation() {
return operation;
}
@@ -491,23 +512,23 @@ public class SqsConfiguration implements Cloneable {
public void setOperation(SqsOperations operation) {
this.operation = operation;
}
-
+
public boolean isAutoCreateQueue() {
- return autoCreateQueue;
- }
+ return autoCreateQueue;
+ }
/**
* Setting the autocreation of the queue
*/
- public void setAutoCreateQueue(boolean autoCreateQueue) {
- this.autoCreateQueue = autoCreateQueue;
- }
-
+ public void setAutoCreateQueue(boolean autoCreateQueue) {
+ this.autoCreateQueue = autoCreateQueue;
+ }
+
// *************************************************
//
// *************************************************
- public SqsConfiguration copy() {
+ public SqsConfiguration copy() {
try {
return (SqsConfiguration)super.clone();
} catch (CloneNotSupportedException e) {
diff --git a/components/camel-aws-sqs/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java b/components/camel-aws-sqs/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java
index b6892be..be11e6b 100644
--- a/components/camel-aws-sqs/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java
+++ b/components/camel-aws-sqs/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java
@@ -52,7 +52,7 @@ import org.apache.camel.util.URISupport;
* <a href="http://aws.amazon.com/sqs/">AWS SQS</a>
*/
public class SqsConsumer extends ScheduledBatchPollingConsumer {
-
+
private ScheduledExecutorService scheduledExecutor;
private transient String sqsConsumerToString;
private Collection<String> attributeNames;
@@ -76,7 +76,7 @@ public class SqsConsumer extends ScheduledBatchPollingConsumer {
// must reset for each poll
shutdownRunningTask = null;
pendingExchanges = 0;
-
+
ReceiveMessageRequest request = new ReceiveMessageRequest(getQueueUrl());
request.setMaxNumberOfMessages(getMaxMessagesPerPoll() > 0 ? getMaxMessagesPerPoll() : null);
request.setVisibilityTimeout(getConfiguration().getVisibilityTimeout() != null ? getConfiguration().getVisibilityTimeout() : null);
@@ -90,7 +90,7 @@ public class SqsConsumer extends ScheduledBatchPollingConsumer {
}
log.trace("Receiving messages with request [{}]...", request);
-
+
ReceiveMessageResult messageResult = null;
try {
messageResult = getClient().receiveMessage(request);
@@ -103,16 +103,16 @@ public class SqsConsumer extends ScheduledBatchPollingConsumer {
if (log.isTraceEnabled()) {
log.trace("Received {} messages", messageResult.getMessages().size());
}
-
+
Queue<Exchange> exchanges = createExchanges(messageResult.getMessages());
return processBatch(CastUtils.cast(exchanges));
}
public void reConnectToQueue() {
try {
- if (getEndpoint().getConfiguration().isAutoCreateQueue()) {
+ if (getEndpoint().getConfiguration().isAutoCreateQueue()) {
getEndpoint().createQueue(getClient());
- }
+ }
} catch (QueueDeletedRecentlyException qdr) {
log.debug("Queue recently deleted, will retry in 30 seconds.");
try {
@@ -125,12 +125,12 @@ public class SqsConsumer extends ScheduledBatchPollingConsumer {
log.warn("Could not connect to queue in amazon.", e);
}
}
-
+
protected Queue<Exchange> createExchanges(List<Message> messages) {
if (log.isTraceEnabled()) {
log.trace("Received {} messages in this poll", messages.size());
}
-
+
Queue<Exchange> answer = new LinkedList<>();
for (Message message : messages) {
Exchange exchange = getEndpoint().createExchange(message);
@@ -139,7 +139,7 @@ public class SqsConsumer extends ScheduledBatchPollingConsumer {
return answer;
}
-
+
public int processBatch(Queue<Object> exchanges) throws Exception {
int total = exchanges.size();
@@ -162,10 +162,10 @@ public class SqsConsumer extends ScheduledBatchPollingConsumer {
int repeatSeconds = Double.valueOf(visibilityTimeout.doubleValue() * 1.5).intValue();
if (log.isDebugEnabled()) {
log.debug("Scheduled TimeoutExtender task to start after {} delay, and run with {}/{} period/repeat (seconds), to extend exchangeId: {}",
- new Object[]{delay, period, repeatSeconds, exchange.getExchangeId()});
+ new Object[] {delay, period, repeatSeconds, exchange.getExchangeId()});
}
- final ScheduledFuture<?> scheduledFuture = this.scheduledExecutor.scheduleAtFixedRate(
- new TimeoutExtender(exchange, repeatSeconds), delay, period, TimeUnit.SECONDS);
+ final ScheduledFuture<?> scheduledFuture = this.scheduledExecutor.scheduleAtFixedRate(new TimeoutExtender(exchange, repeatSeconds), delay, period,
+ TimeUnit.SECONDS);
exchange.addOnCompletion(new Synchronization() {
@Override
public void onComplete(Exchange exchange) {
@@ -212,7 +212,7 @@ public class SqsConsumer extends ScheduledBatchPollingConsumer {
return total;
}
-
+
/**
* Strategy to delete the message after being processed.
*
@@ -237,12 +237,9 @@ public class SqsConsumer extends ScheduledBatchPollingConsumer {
}
private boolean shouldDelete(Exchange exchange) {
- boolean shouldDeleteByFilter = exchange.getProperty(Exchange.FILTER_MATCHED) != null
- && getConfiguration().isDeleteIfFiltered()
- && passedThroughFilter(exchange);
+ boolean shouldDeleteByFilter = exchange.getProperty(Exchange.FILTER_MATCHED) != null && getConfiguration().isDeleteIfFiltered() && passedThroughFilter(exchange);
- return getConfiguration().isDeleteAfterRead()
- || shouldDeleteByFilter;
+ return getConfiguration().isDeleteAfterRead() || shouldDeleteByFilter;
}
private boolean passedThroughFilter(Exchange exchange) {
@@ -264,18 +261,18 @@ public class SqsConsumer extends ScheduledBatchPollingConsumer {
protected SqsConfiguration getConfiguration() {
return getEndpoint().getConfiguration();
}
-
+
protected AmazonSQS getClient() {
return getEndpoint().getClient();
}
-
+
protected String getQueueUrl() {
return getEndpoint().getQueueUrl();
}
-
+
@Override
public SqsEndpoint getEndpoint() {
- return (SqsEndpoint) super.getEndpoint();
+ return (SqsEndpoint)super.getEndpoint();
}
@Override
@@ -318,8 +315,8 @@ public class SqsConsumer extends ScheduledBatchPollingConsumer {
@Override
public void run() {
- ChangeMessageVisibilityRequest request = new ChangeMessageVisibilityRequest(getQueueUrl(),
- exchange.getIn().getHeader(SqsConstants.RECEIPT_HANDLE, String.class), repeatSeconds);
+ ChangeMessageVisibilityRequest request = new ChangeMessageVisibilityRequest(getQueueUrl(), exchange.getIn().getHeader(SqsConstants.RECEIPT_HANDLE, String.class),
+ repeatSeconds);
try {
log.trace("Extending visibility window by {} seconds for exchange {}", this.repeatSeconds, this.exchange);
@@ -330,8 +327,7 @@ public class SqsConsumer extends ScheduledBatchPollingConsumer {
} catch (MessageNotInflightException e) {
// Ignore.
} catch (Exception e) {
- log.warn("Extending visibility window failed for exchange " + exchange
- + ". Will not attempt to extend visibility further. This exception will be ignored.", e);
+ log.warn("Extending visibility window failed for exchange " + exchange + ". Will not attempt to extend visibility further. This exception will be ignored.", e);
}
}
}