You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2023/07/26 10:36:42 UTC

[camel] branch main updated: CAMEL-19562: aws sqs visibility extender is running forever (#10828)

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new f389cdf161c CAMEL-19562: aws sqs visibility extender is running forever (#10828)
f389cdf161c is described below

commit f389cdf161c077cbc96e0dae97c1644e6262f6b8
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Wed Jul 26 12:36:35 2023 +0200

    CAMEL-19562: aws sqs visibility extender is running forever (#10828)
---
 .../camel/component/aws2/sqs/Sqs2Consumer.java     | 50 ++++++++++++++--------
 1 file changed, 31 insertions(+), 19 deletions(-)

diff --git a/components/camel-aws/camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2Consumer.java b/components/camel-aws/camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2Consumer.java
index 521e2e4a78a..233384c061a 100644
--- a/components/camel-aws/camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2Consumer.java
+++ b/components/camel-aws/camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2Consumer.java
@@ -26,6 +26,7 @@ import java.util.Queue;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.Exchange;
@@ -184,9 +185,8 @@ public class Sqs2Consumer extends ScheduledBatchPollingConsumer {
                             delay, period,
                             repeatSeconds, exchange.getExchangeId());
                 }
-                final ScheduledFuture<?> scheduledFuture = this.scheduledExecutor.scheduleAtFixedRate(
-                        new TimeoutExtender(exchange, repeatSeconds), delay, period,
-                        TimeUnit.SECONDS);
+                final TimeoutExtender extender = new TimeoutExtender(exchange, repeatSeconds);
+                final ScheduledFuture<?> scheduledFuture = this.scheduledExecutor.scheduleAtFixedRate(extender, delay, period, TimeUnit.SECONDS);
                 exchange.getExchangeExtension().addOnCompletion(new Synchronization() {
                     @Override
                     public void onComplete(Exchange exchange) {
@@ -202,7 +202,11 @@ public class Sqs2Consumer extends ScheduledBatchPollingConsumer {
                         // cancel task as we are done
                         LOG.trace("Processing done so cancelling TimeoutExtender task for exchangeId: {}",
                                 exchange.getExchangeId());
-                        scheduledFuture.cancel(false);
+                        extender.cancel();
+                        boolean cancelled = scheduledFuture.cancel(true);
+                        if (!cancelled) {
+                            LOG.warn("TimeoutExtender task for exchangeId: {} could not be cancelled", exchange.getExchangeId());
+                        }
                     }
                 });
             }
@@ -397,32 +401,40 @@ public class Sqs2Consumer extends ScheduledBatchPollingConsumer {
 
         private final Exchange exchange;
         private final int repeatSeconds;
+        private final AtomicBoolean run = new AtomicBoolean(true);
 
         TimeoutExtender(Exchange exchange, int repeatSeconds) {
             this.exchange = exchange;
             this.repeatSeconds = repeatSeconds;
         }
 
+        public void cancel() {
+            // cancel by setting to no longer run
+            run.set(false);
+        }
+
         @Override
         public void run() {
-            ChangeMessageVisibilityRequest.Builder request
-                    = ChangeMessageVisibilityRequest.builder().queueUrl(getQueueUrl()).visibilityTimeout(repeatSeconds)
-                            .receiptHandle(exchange.getIn().getHeader(Sqs2Constants.RECEIPT_HANDLE, String.class));
-
-            try {
-                LOG.trace("Extending visibility window by {} seconds for exchange {}", this.repeatSeconds, this.exchange);
-                getEndpoint().getClient().changeMessageVisibility(request.build());
-                LOG.debug("Extended visibility window by {} seconds for exchange {}", this.repeatSeconds, this.exchange);
-            } catch (MessageNotInflightException | ReceiptHandleIsInvalidException e) {
-                // Ignore.
-            } catch (SqsException e) {
-                if (e.getMessage().contains("Message does not exist or is not available for visibility timeout change")) {
+            if (run.get()) {
+                ChangeMessageVisibilityRequest.Builder request
+                        = ChangeMessageVisibilityRequest.builder().queueUrl(getQueueUrl()).visibilityTimeout(repeatSeconds)
+                        .receiptHandle(exchange.getIn().getHeader(Sqs2Constants.RECEIPT_HANDLE, String.class));
+
+                try {
+                    LOG.trace("Extending visibility window by {} seconds for exchange {}", this.repeatSeconds, this.exchange);
+                    getEndpoint().getClient().changeMessageVisibility(request.build());
+                    LOG.debug("Extended visibility window by {} seconds for exchange {}", this.repeatSeconds, this.exchange);
+                } catch (MessageNotInflightException | ReceiptHandleIsInvalidException e) {
                     // Ignore.
-                } else {
+                } catch (SqsException e) {
+                    if (e.getMessage().contains("Message does not exist or is not available for visibility timeout change")) {
+                        // Ignore.
+                    } else {
+                        logException(e);
+                    }
+                } catch (Exception e) {
                     logException(e);
                 }
-            } catch (Exception e) {
-                logException(e);
             }
         }