You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2023/07/26 08:14:34 UTC

[camel] branch sqs created (now 0fe9c48c96b)

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a change to branch sqs
in repository https://gitbox.apache.org/repos/asf/camel.git


      at 0fe9c48c96b CAMEL-19562: aws sqs visibility extender is running forever

This branch includes the following new commits:

     new 0fe9c48c96b CAMEL-19562: aws sqs visibility extender is running forever

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[camel] 01/01: CAMEL-19562: aws sqs visibility extender is running forever

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch sqs
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 0fe9c48c96b117c7bce3fca8e4e32ee3f4f635bc
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Wed Jul 26 10:14:21 2023 +0200

    CAMEL-19562: aws sqs visibility extender is running forever
---
 .../camel/component/aws2/sqs/Sqs2Consumer.java     | 50 ++++++++++++++--------
 1 file changed, 31 insertions(+), 19 deletions(-)

diff --git a/components/camel-aws/camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2Consumer.java b/components/camel-aws/camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2Consumer.java
index 521e2e4a78a..233384c061a 100644
--- a/components/camel-aws/camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2Consumer.java
+++ b/components/camel-aws/camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2Consumer.java
@@ -26,6 +26,7 @@ import java.util.Queue;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.Exchange;
@@ -184,9 +185,8 @@ public class Sqs2Consumer extends ScheduledBatchPollingConsumer {
                             delay, period,
                             repeatSeconds, exchange.getExchangeId());
                 }
-                final ScheduledFuture<?> scheduledFuture = this.scheduledExecutor.scheduleAtFixedRate(
-                        new TimeoutExtender(exchange, repeatSeconds), delay, period,
-                        TimeUnit.SECONDS);
+                final TimeoutExtender extender = new TimeoutExtender(exchange, repeatSeconds);
+                final ScheduledFuture<?> scheduledFuture = this.scheduledExecutor.scheduleAtFixedRate(extender, delay, period, TimeUnit.SECONDS);
                 exchange.getExchangeExtension().addOnCompletion(new Synchronization() {
                     @Override
                     public void onComplete(Exchange exchange) {
@@ -202,7 +202,11 @@ public class Sqs2Consumer extends ScheduledBatchPollingConsumer {
                         // cancel task as we are done
                         LOG.trace("Processing done so cancelling TimeoutExtender task for exchangeId: {}",
                                 exchange.getExchangeId());
-                        scheduledFuture.cancel(false);
+                        extender.cancel();
+                        boolean cancelled = scheduledFuture.cancel(true);
+                        if (!cancelled) {
+                            LOG.warn("TimeoutExtender task for exchangeId: {} could not be cancelled", exchange.getExchangeId());
+                        }
                     }
                 });
             }
@@ -397,32 +401,40 @@ public class Sqs2Consumer extends ScheduledBatchPollingConsumer {
 
         private final Exchange exchange;
         private final int repeatSeconds;
+        private final AtomicBoolean run = new AtomicBoolean(true);
 
         TimeoutExtender(Exchange exchange, int repeatSeconds) {
             this.exchange = exchange;
             this.repeatSeconds = repeatSeconds;
         }
 
+        public void cancel() {
+            // cancel by setting to no longer run
+            run.set(false);
+        }
+
         @Override
         public void run() {
-            ChangeMessageVisibilityRequest.Builder request
-                    = ChangeMessageVisibilityRequest.builder().queueUrl(getQueueUrl()).visibilityTimeout(repeatSeconds)
-                            .receiptHandle(exchange.getIn().getHeader(Sqs2Constants.RECEIPT_HANDLE, String.class));
-
-            try {
-                LOG.trace("Extending visibility window by {} seconds for exchange {}", this.repeatSeconds, this.exchange);
-                getEndpoint().getClient().changeMessageVisibility(request.build());
-                LOG.debug("Extended visibility window by {} seconds for exchange {}", this.repeatSeconds, this.exchange);
-            } catch (MessageNotInflightException | ReceiptHandleIsInvalidException e) {
-                // Ignore.
-            } catch (SqsException e) {
-                if (e.getMessage().contains("Message does not exist or is not available for visibility timeout change")) {
+            if (run.get()) {
+                ChangeMessageVisibilityRequest.Builder request
+                        = ChangeMessageVisibilityRequest.builder().queueUrl(getQueueUrl()).visibilityTimeout(repeatSeconds)
+                        .receiptHandle(exchange.getIn().getHeader(Sqs2Constants.RECEIPT_HANDLE, String.class));
+
+                try {
+                    LOG.trace("Extending visibility window by {} seconds for exchange {}", this.repeatSeconds, this.exchange);
+                    getEndpoint().getClient().changeMessageVisibility(request.build());
+                    LOG.debug("Extended visibility window by {} seconds for exchange {}", this.repeatSeconds, this.exchange);
+                } catch (MessageNotInflightException | ReceiptHandleIsInvalidException e) {
                     // Ignore.
-                } else {
+                } catch (SqsException e) {
+                    if (e.getMessage().contains("Message does not exist or is not available for visibility timeout change")) {
+                        // Ignore.
+                    } else {
+                        logException(e);
+                    }
+                } catch (Exception e) {
                     logException(e);
                 }
-            } catch (Exception e) {
-                logException(e);
             }
         }