You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2019/05/24 12:52:41 UTC

[camel] 02/06: CAMEL-13570 - Fixed CS

This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 9b3fec8ee3a5c6e4c27732936371d9d86bf886b3
Author: Andrea Cosentino <an...@gmail.com>
AuthorDate: Fri May 24 14:02:27 2019 +0200

    CAMEL-13570 - Fixed CS
---
 .../camel/component/aws/sqs/SqsConfiguration.java  | 105 ++++++++++++---------
 .../camel/component/aws/sqs/SqsConsumer.java       |  48 +++++-----
 2 files changed, 85 insertions(+), 68 deletions(-)

diff --git a/components/camel-aws-sqs/src/main/java/org/apache/camel/component/aws/sqs/SqsConfiguration.java b/components/camel-aws-sqs/src/main/java/org/apache/camel/component/aws/sqs/SqsConfiguration.java
index 3a17d7a..4305a01 100644
--- a/components/camel-aws-sqs/src/main/java/org/apache/camel/component/aws/sqs/SqsConfiguration.java
+++ b/components/camel-aws-sqs/src/main/java/org/apache/camel/component/aws/sqs/SqsConfiguration.java
@@ -96,13 +96,13 @@ public class SqsConfiguration implements Cloneable {
     private Integer receiveMessageWaitTimeSeconds;
     @UriParam(label = "queue")
     private String policy;
-    
+
     // dead letter queue properties
     @UriParam(label = "queue")
     private String redrivePolicy;
 
     /**
-     *  Whether or not the queue is a FIFO queue
+     * Whether or not the queue is a FIFO queue
      */
     boolean isFifoQueue() {
         // AWS docs suggest this is valid derivation.
@@ -184,10 +184,12 @@ public class SqsConfiguration implements Cloneable {
     }
 
     /**
-     * The duration (in seconds) that the received messages are hidden from subsequent retrieve requests after being retrieved
-     * by a ReceiveMessage request to set in the com.amazonaws.services.sqs.model.SetQueueAttributesRequest.
-     * This only make sense if its different from defaultVisibilityTimeout.
-     * It changes the queue visibility timeout attribute permanently.
+     * The duration (in seconds) that the received messages are hidden from
+     * subsequent retrieve requests after being retrieved by a ReceiveMessage
+     * request to set in the
+     * com.amazonaws.services.sqs.model.SetQueueAttributesRequest. This only
+     * make sense if its different from defaultVisibilityTimeout. It changes the
+     * queue visibility timeout attribute permanently.
      */
     public void setVisibilityTimeout(Integer visibilityTimeout) {
         this.visibilityTimeout = visibilityTimeout;
@@ -198,7 +200,8 @@ public class SqsConfiguration implements Cloneable {
     }
 
     /**
-     * A list of attribute names to receive when consuming.  Multiple names can be separated by comma.
+     * A list of attribute names to receive when consuming. Multiple names can
+     * be separated by comma.
      */
     public void setAttributeNames(String attributeNames) {
         this.attributeNames = attributeNames;
@@ -209,7 +212,8 @@ public class SqsConfiguration implements Cloneable {
     }
 
     /**
-     * A list of message attribute names to receive when consuming. Multiple names can be separated by comma.
+     * A list of message attribute names to receive when consuming. Multiple
+     * names can be separated by comma.
      */
     public void setMessageAttributeNames(String messageAttributeNames) {
         this.messageAttributeNames = messageAttributeNames;
@@ -242,7 +246,8 @@ public class SqsConfiguration implements Cloneable {
     }
 
     /**
-     * Define if you want to apply delaySeconds option to the queue or on single messages
+     * Define if you want to apply delaySeconds option to the queue or on single
+     * messages
      */
     public void setDelayQueue(boolean delayQueue) {
         this.delayQueue = delayQueue;
@@ -253,7 +258,8 @@ public class SqsConfiguration implements Cloneable {
     }
 
     /**
-     * The maximumMessageSize (in bytes) an SQS message can contain for this queue.
+     * The maximumMessageSize (in bytes) an SQS message can contain for this
+     * queue.
      */
     public void setMaximumMessageSize(Integer maximumMessageSize) {
         this.maximumMessageSize = maximumMessageSize;
@@ -264,7 +270,8 @@ public class SqsConfiguration implements Cloneable {
     }
 
     /**
-     * The messageRetentionPeriod (in seconds) a message will be retained by SQS for this queue.
+     * The messageRetentionPeriod (in seconds) a message will be retained by SQS
+     * for this queue.
      */
     public void setMessageRetentionPeriod(Integer messageRetentionPeriod) {
         this.messageRetentionPeriod = messageRetentionPeriod;
@@ -286,7 +293,8 @@ public class SqsConfiguration implements Cloneable {
     }
 
     /**
-     * Specify the policy that send message to DeadLetter queue. See detail at Amazon docs.
+     * Specify the policy that send message to DeadLetter queue. See detail at
+     * Amazon docs.
      */
     public void setRedrivePolicy(String redrivePolicy) {
         this.redrivePolicy = redrivePolicy;
@@ -297,8 +305,9 @@ public class SqsConfiguration implements Cloneable {
     }
 
     /**
-     * If enabled then a scheduled background task will keep extending the message visibility on SQS.
-     * This is needed if it takes a long time to process the message. If set to true defaultVisibilityTimeout must be set.
+     * If enabled then a scheduled background task will keep extending the
+     * message visibility on SQS. This is needed if it takes a long time to
+     * process the message. If set to true defaultVisibilityTimeout must be set.
      * See details at Amazon docs.
      */
     public void setExtendMessageVisibility(boolean extendMessageVisibility) {
@@ -310,7 +319,8 @@ public class SqsConfiguration implements Cloneable {
     }
 
     /**
-     * If you do not specify WaitTimeSeconds in the request, the queue attribute ReceiveMessageWaitTimeSeconds is used to determine how long to wait.
+     * If you do not specify WaitTimeSeconds in the request, the queue attribute
+     * ReceiveMessageWaitTimeSeconds is used to determine how long to wait.
      */
     public void setReceiveMessageWaitTimeSeconds(Integer receiveMessageWaitTimeSeconds) {
         this.receiveMessageWaitTimeSeconds = receiveMessageWaitTimeSeconds;
@@ -321,7 +331,8 @@ public class SqsConfiguration implements Cloneable {
     }
 
     /**
-     * Duration in seconds (0 to 20) that the ReceiveMessage action call will wait until a message is in the queue to include in the response.
+     * Duration in seconds (0 to 20) that the ReceiveMessage action call will
+     * wait until a message is in the queue to include in the response.
      */
     public void setWaitTimeSeconds(Integer waitTimeSeconds) {
         this.waitTimeSeconds = waitTimeSeconds;
@@ -332,7 +343,8 @@ public class SqsConfiguration implements Cloneable {
     }
 
     /**
-     * Specify the queue owner aws account id when you need to connect the queue with different account owner.
+     * Specify the queue owner aws account id when you need to connect the queue
+     * with different account owner.
      */
     public void setQueueOwnerAWSAccountId(String queueOwnerAWSAccountId) {
         this.queueOwnerAWSAccountId = queueOwnerAWSAccountId;
@@ -343,8 +355,10 @@ public class SqsConfiguration implements Cloneable {
     }
 
     /**
-     * Whether or not to send the DeleteMessage to the SQS queue if an exchange fails to get through a filter.
-     * If 'false' and exchange does not make it through a Camel filter upstream in the route, then don't send DeleteMessage.
+     * Whether or not to send the DeleteMessage to the SQS queue if an exchange
+     * fails to get through a filter. If 'false' and exchange does not make it
+     * through a Camel filter upstream in the route, then don't send
+     * DeleteMessage.
      */
     public void setDeleteIfFiltered(boolean deleteIfFiltered) {
         this.deleteIfFiltered = deleteIfFiltered;
@@ -355,7 +369,8 @@ public class SqsConfiguration implements Cloneable {
     }
 
     /**
-     * Specify the queue region which could be used with queueOwnerAWSAccountId to build the service URL.
+     * Specify the queue region which could be used with queueOwnerAWSAccountId
+     * to build the service URL.
      */
     public void setRegion(String region) {
         this.region = region;
@@ -366,7 +381,8 @@ public class SqsConfiguration implements Cloneable {
     }
 
     /**
-     * Allows you to use multiple threads to poll the sqs queue to increase throughput
+     * Allows you to use multiple threads to poll the sqs queue to increase
+     * throughput
      */
     public void setConcurrentConsumers(int concurrentConsumers) {
         this.concurrentConsumers = concurrentConsumers;
@@ -377,8 +393,9 @@ public class SqsConfiguration implements Cloneable {
     }
 
     /**
-     * To define the queueUrl explicitly. All other parameters, which would influence the queueUrl, are ignored.
-     * This parameter is intended to be used, to connect to a mock implementation of SQS, for testing purposes.
+     * To define the queueUrl explicitly. All other parameters, which would
+     * influence the queueUrl, are ignored. This parameter is intended to be
+     * used, to connect to a mock implementation of SQS, for testing purposes.
      */
     public void setQueueUrl(String queueUrl) {
         this.queueUrl = queueUrl;
@@ -411,7 +428,8 @@ public class SqsConfiguration implements Cloneable {
     }
 
     /**
-     * The ID of an AWS-managed customer master key (CMK) for Amazon SQS or a custom CMK.
+     * The ID of an AWS-managed customer master key (CMK) for Amazon SQS or a
+     * custom CMK.
      */
     public void setKmsMasterKeyId(String kmsMasterKeyId) {
         this.kmsMasterKeyId = kmsMasterKeyId;
@@ -422,9 +440,10 @@ public class SqsConfiguration implements Cloneable {
     }
 
     /**
-     * The length of time, in seconds, for which Amazon SQS can reuse a data key to encrypt or decrypt 
-     * messages before calling AWS KMS again. An integer representing seconds, between 60 seconds (1 minute) 
-     * and 86,400 seconds (24 hours). Default: 300 (5 minutes).
+     * The length of time, in seconds, for which Amazon SQS can reuse a data key
+     * to encrypt or decrypt messages before calling AWS KMS again. An integer
+     * representing seconds, between 60 seconds (1 minute) and 86,400 seconds
+     * (24 hours). Default: 300 (5 minutes).
      */
     public void setKmsDataKeyReusePeriodSeconds(Integer kmsDataKeyReusePeriodSeconds) {
         this.kmsDataKeyReusePeriodSeconds = kmsDataKeyReusePeriodSeconds;
@@ -442,9 +461,10 @@ public class SqsConfiguration implements Cloneable {
     }
 
     /**
-     * Only for FIFO queues. Strategy for setting the messageGroupId on the message.
-     * Can be one of the following options: *useConstant*, *useExchangeId*, *usePropertyValue*.
-     * For the *usePropertyValue* option, the value of property "CamelAwsMessageGroupId" will be used.
+     * Only for FIFO queues. Strategy for setting the messageGroupId on the
+     * message. Can be one of the following options: *useConstant*,
+     * *useExchangeId*, *usePropertyValue*. For the *usePropertyValue* option,
+     * the value of property "CamelAwsMessageGroupId" will be used.
      */
     public void setMessageGroupIdStrategy(String strategy) {
         if ("useConstant".equalsIgnoreCase(strategy)) {
@@ -467,9 +487,10 @@ public class SqsConfiguration implements Cloneable {
     }
 
     /**
-     * Only for FIFO queues. Strategy for setting the messageDeduplicationId on the message.
-     * Can be one of the following options: *useExchangeId*, *useContentBasedDeduplication*.
-     * For the *useContentBasedDeduplication* option, no messageDeduplicationId will be set on the message.
+     * Only for FIFO queues. Strategy for setting the messageDeduplicationId on
+     * the message. Can be one of the following options: *useExchangeId*,
+     * *useContentBasedDeduplication*. For the *useContentBasedDeduplication*
+     * option, no messageDeduplicationId will be set on the message.
      */
     public void setMessageDeduplicationIdStrategy(String strategy) {
         if ("useExchangeId".equalsIgnoreCase(strategy)) {
@@ -480,7 +501,7 @@ public class SqsConfiguration implements Cloneable {
             throw new IllegalArgumentException("Unrecognised MessageDeduplicationIdStrategy: " + strategy);
         }
     }
-    
+
     public SqsOperations getOperation() {
         return operation;
     }
@@ -491,23 +512,23 @@ public class SqsConfiguration implements Cloneable {
     public void setOperation(SqsOperations operation) {
         this.operation = operation;
     }
-    
+
     public boolean isAutoCreateQueue() {
-		return autoCreateQueue;
-	}
+        return autoCreateQueue;
+    }
 
     /**
      * Setting the autocreation of the queue
      */
-	public void setAutoCreateQueue(boolean autoCreateQueue) {
-		this.autoCreateQueue = autoCreateQueue;
-	}
-    
+    public void setAutoCreateQueue(boolean autoCreateQueue) {
+        this.autoCreateQueue = autoCreateQueue;
+    }
+
     // *************************************************
     //
     // *************************************************
 
-	public SqsConfiguration copy() {
+    public SqsConfiguration copy() {
         try {
             return (SqsConfiguration)super.clone();
         } catch (CloneNotSupportedException e) {
diff --git a/components/camel-aws-sqs/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java b/components/camel-aws-sqs/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java
index b6892be..be11e6b 100644
--- a/components/camel-aws-sqs/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java
+++ b/components/camel-aws-sqs/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java
@@ -52,7 +52,7 @@ import org.apache.camel.util.URISupport;
  * <a href="http://aws.amazon.com/sqs/">AWS SQS</a>
  */
 public class SqsConsumer extends ScheduledBatchPollingConsumer {
-    
+
     private ScheduledExecutorService scheduledExecutor;
     private transient String sqsConsumerToString;
     private Collection<String> attributeNames;
@@ -76,7 +76,7 @@ public class SqsConsumer extends ScheduledBatchPollingConsumer {
         // must reset for each poll
         shutdownRunningTask = null;
         pendingExchanges = 0;
-        
+
         ReceiveMessageRequest request = new ReceiveMessageRequest(getQueueUrl());
         request.setMaxNumberOfMessages(getMaxMessagesPerPoll() > 0 ? getMaxMessagesPerPoll() : null);
         request.setVisibilityTimeout(getConfiguration().getVisibilityTimeout() != null ? getConfiguration().getVisibilityTimeout() : null);
@@ -90,7 +90,7 @@ public class SqsConsumer extends ScheduledBatchPollingConsumer {
         }
 
         log.trace("Receiving messages with request [{}]...", request);
-        
+
         ReceiveMessageResult messageResult = null;
         try {
             messageResult = getClient().receiveMessage(request);
@@ -103,16 +103,16 @@ public class SqsConsumer extends ScheduledBatchPollingConsumer {
         if (log.isTraceEnabled()) {
             log.trace("Received {} messages", messageResult.getMessages().size());
         }
-        
+
         Queue<Exchange> exchanges = createExchanges(messageResult.getMessages());
         return processBatch(CastUtils.cast(exchanges));
     }
 
     public void reConnectToQueue() {
         try {
-        	if (getEndpoint().getConfiguration().isAutoCreateQueue()) {
+            if (getEndpoint().getConfiguration().isAutoCreateQueue()) {
                 getEndpoint().createQueue(getClient());
-        	}
+            }
         } catch (QueueDeletedRecentlyException qdr) {
             log.debug("Queue recently deleted, will retry in 30 seconds.");
             try {
@@ -125,12 +125,12 @@ public class SqsConsumer extends ScheduledBatchPollingConsumer {
             log.warn("Could not connect to queue in amazon.", e);
         }
     }
-    
+
     protected Queue<Exchange> createExchanges(List<Message> messages) {
         if (log.isTraceEnabled()) {
             log.trace("Received {} messages in this poll", messages.size());
         }
-        
+
         Queue<Exchange> answer = new LinkedList<>();
         for (Message message : messages) {
             Exchange exchange = getEndpoint().createExchange(message);
@@ -139,7 +139,7 @@ public class SqsConsumer extends ScheduledBatchPollingConsumer {
 
         return answer;
     }
-    
+
     public int processBatch(Queue<Object> exchanges) throws Exception {
         int total = exchanges.size();
 
@@ -162,10 +162,10 @@ public class SqsConsumer extends ScheduledBatchPollingConsumer {
                 int repeatSeconds = Double.valueOf(visibilityTimeout.doubleValue() * 1.5).intValue();
                 if (log.isDebugEnabled()) {
                     log.debug("Scheduled TimeoutExtender task to start after {} delay, and run with {}/{} period/repeat (seconds), to extend exchangeId: {}",
-                            new Object[]{delay, period, repeatSeconds, exchange.getExchangeId()});
+                              new Object[] {delay, period, repeatSeconds, exchange.getExchangeId()});
                 }
-                final ScheduledFuture<?> scheduledFuture = this.scheduledExecutor.scheduleAtFixedRate(
-                        new TimeoutExtender(exchange, repeatSeconds), delay, period, TimeUnit.SECONDS);
+                final ScheduledFuture<?> scheduledFuture = this.scheduledExecutor.scheduleAtFixedRate(new TimeoutExtender(exchange, repeatSeconds), delay, period,
+                                                                                                      TimeUnit.SECONDS);
                 exchange.addOnCompletion(new Synchronization() {
                     @Override
                     public void onComplete(Exchange exchange) {
@@ -212,7 +212,7 @@ public class SqsConsumer extends ScheduledBatchPollingConsumer {
 
         return total;
     }
-    
+
     /**
      * Strategy to delete the message after being processed.
      *
@@ -237,12 +237,9 @@ public class SqsConsumer extends ScheduledBatchPollingConsumer {
     }
 
     private boolean shouldDelete(Exchange exchange) {
-        boolean shouldDeleteByFilter = exchange.getProperty(Exchange.FILTER_MATCHED) != null
-                && getConfiguration().isDeleteIfFiltered()
-                && passedThroughFilter(exchange);
+        boolean shouldDeleteByFilter = exchange.getProperty(Exchange.FILTER_MATCHED) != null && getConfiguration().isDeleteIfFiltered() && passedThroughFilter(exchange);
 
-        return getConfiguration().isDeleteAfterRead()
-                || shouldDeleteByFilter;
+        return getConfiguration().isDeleteAfterRead() || shouldDeleteByFilter;
     }
 
     private boolean passedThroughFilter(Exchange exchange) {
@@ -264,18 +261,18 @@ public class SqsConsumer extends ScheduledBatchPollingConsumer {
     protected SqsConfiguration getConfiguration() {
         return getEndpoint().getConfiguration();
     }
-    
+
     protected AmazonSQS getClient() {
         return getEndpoint().getClient();
     }
-    
+
     protected String getQueueUrl() {
         return getEndpoint().getQueueUrl();
     }
-    
+
     @Override
     public SqsEndpoint getEndpoint() {
-        return (SqsEndpoint) super.getEndpoint();
+        return (SqsEndpoint)super.getEndpoint();
     }
 
     @Override
@@ -318,8 +315,8 @@ public class SqsConsumer extends ScheduledBatchPollingConsumer {
 
         @Override
         public void run() {
-            ChangeMessageVisibilityRequest request = new ChangeMessageVisibilityRequest(getQueueUrl(),
-                    exchange.getIn().getHeader(SqsConstants.RECEIPT_HANDLE, String.class), repeatSeconds);
+            ChangeMessageVisibilityRequest request = new ChangeMessageVisibilityRequest(getQueueUrl(), exchange.getIn().getHeader(SqsConstants.RECEIPT_HANDLE, String.class),
+                                                                                        repeatSeconds);
 
             try {
                 log.trace("Extending visibility window by {} seconds for exchange {}", this.repeatSeconds, this.exchange);
@@ -330,8 +327,7 @@ public class SqsConsumer extends ScheduledBatchPollingConsumer {
             } catch (MessageNotInflightException e) {
                 // Ignore.
             } catch (Exception e) {
-                log.warn("Extending visibility window failed for exchange " + exchange
-                        + ". Will not attempt to extend visibility further. This exception will be ignored.", e);
+                log.warn("Extending visibility window failed for exchange " + exchange + ". Will not attempt to extend visibility further. This exception will be ignored.", e);
             }
         }
     }