You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2023/08/17 12:59:27 UTC

[camel] branch main updated: CAMEL-19656 batch visibility extender task (#11102)

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new 9a29ad738c5 CAMEL-19656 batch visibility extender task (#11102)
9a29ad738c5 is described below

commit 9a29ad738c5514d1ea9dc8975b5ab353333eac21
Author: Jono Morris <jo...@xtra.co.nz>
AuthorDate: Fri Aug 18 00:59:21 2023 +1200

    CAMEL-19656 batch visibility extender task (#11102)
    
    * CAMEL-19656 batch visibility extender task
    
    * CAMEL-19656 run extender as single background task
    
    * CAMEL-19656 new list instance for each request
    
    * CAMEL-19656 remove exchange from extender when complete
    
    * CAMEL-19656 update logging collection reference
---
 .../camel/component/aws2/sqs/Sqs2Consumer.java     | 155 +++++++++++++--------
 .../component/aws2/sqs/AmazonSQSClientMock.java    |  16 +--
 .../SqsConsumerExtendMessageVisibilityTest.java    |   4 +-
 .../sqs/SqsDoesNotExtendMessageVisibilityTest.java |   2 +-
 4 files changed, 105 insertions(+), 72 deletions(-)

diff --git a/components/camel-aws/camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2Consumer.java b/components/camel-aws/camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2Consumer.java
index 4515323cca3..ee037337e5e 100644
--- a/components/camel-aws/camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2Consumer.java
+++ b/components/camel-aws/camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2Consumer.java
@@ -23,6 +23,7 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Queue;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
@@ -47,7 +48,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import software.amazon.awssdk.awscore.exception.AwsServiceException;
 import software.amazon.awssdk.services.sqs.SqsClient;
-import software.amazon.awssdk.services.sqs.model.ChangeMessageVisibilityRequest;
+import software.amazon.awssdk.services.sqs.model.ChangeMessageVisibilityBatchRequest;
+import software.amazon.awssdk.services.sqs.model.ChangeMessageVisibilityBatchRequestEntry;
 import software.amazon.awssdk.services.sqs.model.DeleteMessageRequest;
 import software.amazon.awssdk.services.sqs.model.MessageAttributeValue;
 import software.amazon.awssdk.services.sqs.model.MessageNotInflightException;
@@ -66,6 +68,8 @@ public class Sqs2Consumer extends ScheduledBatchPollingConsumer {
 
     private static final Logger LOG = LoggerFactory.getLogger(Sqs2Consumer.class);
 
+    private TimeoutExtender timeoutExtender;
+    private ScheduledFuture<?> scheduledFuture;
     private ScheduledExecutorService scheduledExecutor;
     private transient String sqsConsumerToString;
     private Collection<String> attributeNames;
@@ -169,44 +173,8 @@ public class Sqs2Consumer extends ScheduledBatchPollingConsumer {
             // update pending number of exchanges
             pendingExchanges = total - index - 1;
 
-            // schedule task to extend visibility if enabled
-            Integer visibilityTimeout = getConfiguration().getVisibilityTimeout();
-            if (this.scheduledExecutor != null && visibilityTimeout != null && (visibilityTimeout.intValue() / 2) > 0) {
-                int delay = visibilityTimeout.intValue() / 2;
-                int period = visibilityTimeout.intValue();
-                int repeatSeconds = (int) (visibilityTimeout.doubleValue() * 1.5);
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug(
-                            "Scheduled TimeoutExtender task to start after {} delay, and run with {}/{} period/repeat (seconds), to extend exchangeId: {}",
-                            delay, period,
-                            repeatSeconds, exchange.getExchangeId());
-                }
-                final TimeoutExtender extender = new TimeoutExtender(exchange, repeatSeconds);
-                final ScheduledFuture<?> scheduledFuture
-                        = this.scheduledExecutor.scheduleAtFixedRate(extender, delay, period, TimeUnit.SECONDS);
-                exchange.getExchangeExtension().addOnCompletion(new Synchronization() {
-                    @Override
-                    public void onComplete(Exchange exchange) {
-                        cancelExtender(exchange);
-                    }
-
-                    @Override
-                    public void onFailure(Exchange exchange) {
-                        cancelExtender(exchange);
-                    }
-
-                    private void cancelExtender(Exchange exchange) {
-                        // cancel task as we are done
-                        LOG.trace("Processing done so cancelling TimeoutExtender task for exchangeId: {}",
-                                exchange.getExchangeId());
-                        extender.cancel();
-                        boolean cancelled = scheduledFuture.cancel(true);
-                        if (!cancelled) {
-                            LOG.warn("TimeoutExtender task for exchangeId: {} could not be cancelled",
-                                    exchange.getExchangeId());
-                        }
-                    }
-                });
+            if (this.timeoutExtender != null) {
+                timeoutExtender.add(exchange);
             }
 
             // add on completion to handle after work when the exchange is done
@@ -370,6 +338,22 @@ public class Sqs2Consumer extends ScheduledBatchPollingConsumer {
 
             this.scheduledExecutor = getEndpoint().getCamelContext().getExecutorServiceManager().newScheduledThreadPool(this,
                     "SqsTimeoutExtender", profile);
+
+            Integer visibilityTimeout = getConfiguration().getVisibilityTimeout();
+
+            if (visibilityTimeout != null && visibilityTimeout > 0) {
+                int delay = visibilityTimeout;
+                int repeatSeconds = (int) (visibilityTimeout.doubleValue() * 1.5);
+                this.timeoutExtender = new TimeoutExtender(repeatSeconds);
+
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug(
+                            "Scheduled TimeoutExtender task to start after {} delay, and run with {}/{} period/repeat (seconds)",
+                            delay, delay, repeatSeconds);
+                }
+                this.scheduledFuture
+                        = scheduledExecutor.scheduleAtFixedRate(this.timeoutExtender, delay, delay, TimeUnit.SECONDS);
+            }
         }
 
         super.doStart();
@@ -377,6 +361,16 @@ public class Sqs2Consumer extends ScheduledBatchPollingConsumer {
 
     @Override
     protected void doShutdown() throws Exception {
+        if (timeoutExtender != null) {
+            timeoutExtender.cancel();
+            timeoutExtender = null;
+        }
+
+        if (scheduledFuture != null) {
+            scheduledFuture.cancel(true);
+            scheduledFuture = null;
+        }
+
         if (scheduledExecutor != null) {
             getEndpoint().getCamelContext().getExecutorServiceManager().shutdownNow(scheduledExecutor);
             scheduledExecutor = null;
@@ -387,15 +381,43 @@ public class Sqs2Consumer extends ScheduledBatchPollingConsumer {
 
     private class TimeoutExtender implements Runnable {
 
-        private final Exchange exchange;
+        private static final int MAX_REQUESTS = 10;
         private final int repeatSeconds;
         private final AtomicBoolean run = new AtomicBoolean(true);
+        private final Map<String, ChangeMessageVisibilityBatchRequestEntry> entries = new ConcurrentHashMap<>();
 
-        TimeoutExtender(Exchange exchange, int repeatSeconds) {
-            this.exchange = exchange;
+        TimeoutExtender(int repeatSeconds) {
             this.repeatSeconds = repeatSeconds;
         }
 
+        public void add(Exchange exchange) {
+            exchange.getExchangeExtension().addOnCompletion(new Synchronization() {
+                @Override
+                public void onComplete(Exchange exchange) {
+                    remove(exchange);
+                }
+
+                @Override
+                public void onFailure(Exchange exchange) {
+                    remove(exchange);
+                }
+
+                private void remove(Exchange exchange) {
+                    LOG.trace("Removing exchangeId {} from the TimeoutExtender, processing done",
+                            exchange.getExchangeId());
+                    entries.remove(exchange.getExchangeId());
+                }
+            });
+
+            ChangeMessageVisibilityBatchRequestEntry entry
+                    = ChangeMessageVisibilityBatchRequestEntry.builder()
+                            .id(exchange.getExchangeId()).visibilityTimeout(repeatSeconds)
+                            .receiptHandle(exchange.getIn().getHeader(Sqs2Constants.RECEIPT_HANDLE, String.class))
+                            .build();
+
+            entries.put(exchange.getExchangeId(), entry);
+        }
+
         public void cancel() {
             // cancel by setting to no longer run
             run.set(false);
@@ -404,32 +426,43 @@ public class Sqs2Consumer extends ScheduledBatchPollingConsumer {
         @Override
         public void run() {
             if (run.get()) {
-                ChangeMessageVisibilityRequest.Builder request
-                        = ChangeMessageVisibilityRequest.builder().queueUrl(getQueueUrl()).visibilityTimeout(repeatSeconds)
-                                .receiptHandle(exchange.getIn().getHeader(Sqs2Constants.RECEIPT_HANDLE, String.class));
-
-                try {
-                    LOG.trace("Extending visibility window by {} seconds for exchange {}", this.repeatSeconds, this.exchange);
-                    getEndpoint().getClient().changeMessageVisibility(request.build());
-                    LOG.debug("Extended visibility window by {} seconds for exchange {}", this.repeatSeconds, this.exchange);
-                } catch (MessageNotInflightException | ReceiptHandleIsInvalidException e) {
-                    // Ignore.
-                } catch (SqsException e) {
-                    if (e.getMessage().contains("Message does not exist or is not available for visibility timeout change")) {
+                Queue<ChangeMessageVisibilityBatchRequestEntry> entryQueue = new LinkedList<>(entries.values());
+
+                while (!entryQueue.isEmpty()) {
+                    List<ChangeMessageVisibilityBatchRequestEntry> batchEntries = new LinkedList<>();
+                    // up to 10 requests can be sent with each ChangeMessageVisibilityBatch action
+                    while (!entryQueue.isEmpty() && batchEntries.size() < MAX_REQUESTS) {
+                        batchEntries.add(entryQueue.poll());
+                    }
+
+                    ChangeMessageVisibilityBatchRequest request
+                            = ChangeMessageVisibilityBatchRequest.builder().queueUrl(getQueueUrl()).entries(batchEntries)
+                                    .build();
+
+                    try {
+                        LOG.trace("Extending visibility window by {} seconds for request entries {}", repeatSeconds, batchEntries);
+                        getEndpoint().getClient().changeMessageVisibilityBatch(request);
+                        LOG.debug("Extended visibility window for request entries {}", batchEntries);
+                    } catch (MessageNotInflightException | ReceiptHandleIsInvalidException e) {
                         // Ignore.
-                    } else {
-                        logException(e);
+                    } catch (SqsException e) {
+                        if (e.getMessage()
+                                .contains("Message does not exist or is not available for visibility timeout change")) {
+                            // Ignore.
+                        } else {
+                            logException(e, batchEntries);
+                        }
+                    } catch (Exception e) {
+                        logException(e, batchEntries);
                     }
-                } catch (Exception e) {
-                    logException(e);
                 }
             }
         }
 
-        private void logException(Exception e) {
-            LOG.warn("Extending visibility window failed for exchange {}"
+        private void logException(Exception e, List<ChangeMessageVisibilityBatchRequestEntry> entries) {
+            LOG.warn("Extending visibility window failed for entries {}"
                      + ". Will not attempt to extend visibility further. This exception will be ignored.",
-                    exchange, e);
+                    entries, e);
         }
     }
 
diff --git a/components/camel-aws/camel-aws2-sqs/src/test/java/org/apache/camel/component/aws2/sqs/AmazonSQSClientMock.java b/components/camel-aws/camel-aws2-sqs/src/test/java/org/apache/camel/component/aws2/sqs/AmazonSQSClientMock.java
index 3d68ed19002..7d3affb8cf7 100644
--- a/components/camel-aws/camel-aws2-sqs/src/test/java/org/apache/camel/component/aws2/sqs/AmazonSQSClientMock.java
+++ b/components/camel-aws/camel-aws2-sqs/src/test/java/org/apache/camel/component/aws2/sqs/AmazonSQSClientMock.java
@@ -33,8 +33,8 @@ import software.amazon.awssdk.core.exception.SdkClientException;
 import software.amazon.awssdk.services.sqs.SqsClient;
 import software.amazon.awssdk.services.sqs.SqsServiceClientConfiguration;
 import software.amazon.awssdk.services.sqs.model.BatchResultErrorEntry;
-import software.amazon.awssdk.services.sqs.model.ChangeMessageVisibilityRequest;
-import software.amazon.awssdk.services.sqs.model.ChangeMessageVisibilityResponse;
+import software.amazon.awssdk.services.sqs.model.ChangeMessageVisibilityBatchRequest;
+import software.amazon.awssdk.services.sqs.model.ChangeMessageVisibilityBatchResponse;
 import software.amazon.awssdk.services.sqs.model.CreateQueueRequest;
 import software.amazon.awssdk.services.sqs.model.CreateQueueResponse;
 import software.amazon.awssdk.services.sqs.model.DeleteMessageRequest;
@@ -63,7 +63,7 @@ public class AmazonSQSClientMock implements SqsClient {
 
     List<Message> messages = new ArrayList<>();
     Map<String, Map<String, String>> queueAttributes = new HashMap<>();
-    List<ChangeMessageVisibilityRequest> changeMessageVisibilityRequests = new CopyOnWriteArrayList<>();
+    List<ChangeMessageVisibilityBatchRequest> changeMessageVisibilityBatchRequests = new CopyOnWriteArrayList<>();
     private Map<String, CreateQueueRequest> queues = new LinkedHashMap<>();
     private Map<String, ScheduledFuture<?>> inFlight = new LinkedHashMap<>();
     private ScheduledExecutorService scheduler;
@@ -232,12 +232,12 @@ public class AmazonSQSClientMock implements SqsClient {
     public SqsServiceClientConfiguration serviceClientConfiguration() {
         return null;
     }
-
+    
     @Override
-    public ChangeMessageVisibilityResponse changeMessageVisibility(
-            ChangeMessageVisibilityRequest changeMessageVisibilityRequest) {
-        this.changeMessageVisibilityRequests.add(changeMessageVisibilityRequest);
-        return ChangeMessageVisibilityResponse.builder().build();
+    public ChangeMessageVisibilityBatchResponse changeMessageVisibilityBatch(
+            ChangeMessageVisibilityBatchRequest changeMessageVisibilityBatchRequest) {
+        this.changeMessageVisibilityBatchRequests.add(changeMessageVisibilityBatchRequest);
+        return ChangeMessageVisibilityBatchResponse.builder().build();
     }
 
     @Override
diff --git a/components/camel-aws/camel-aws2-sqs/src/test/java/org/apache/camel/component/aws2/sqs/SqsConsumerExtendMessageVisibilityTest.java b/components/camel-aws/camel-aws2-sqs/src/test/java/org/apache/camel/component/aws2/sqs/SqsConsumerExtendMessageVisibilityTest.java
index e972cd3524c..1ecb28ae27d 100644
--- a/components/camel-aws/camel-aws2-sqs/src/test/java/org/apache/camel/component/aws2/sqs/SqsConsumerExtendMessageVisibilityTest.java
+++ b/components/camel-aws/camel-aws2-sqs/src/test/java/org/apache/camel/component/aws2/sqs/SqsConsumerExtendMessageVisibilityTest.java
@@ -60,8 +60,8 @@ public class SqsConsumerExtendMessageVisibilityTest extends CamelTestSupport {
         // Wait for message to arrive.
         MockEndpoint.assertIsSatisfied(context);
 
-        assertTrue(this.client.changeMessageVisibilityRequests.size() >= 1);
-        assertTrue(this.client.changeMessageVisibilityRequests.size() <= 3);
+        assertTrue(this.client.changeMessageVisibilityBatchRequests.size() >= 1);
+        assertTrue(this.client.changeMessageVisibilityBatchRequests.size() <= 3);
     }
 
     @Override
diff --git a/components/camel-aws/camel-aws2-sqs/src/test/java/org/apache/camel/component/aws2/sqs/SqsDoesNotExtendMessageVisibilityTest.java b/components/camel-aws/camel-aws2-sqs/src/test/java/org/apache/camel/component/aws2/sqs/SqsDoesNotExtendMessageVisibilityTest.java
index 1b5414bfa95..83b7b61f9d6 100644
--- a/components/camel-aws/camel-aws2-sqs/src/test/java/org/apache/camel/component/aws2/sqs/SqsDoesNotExtendMessageVisibilityTest.java
+++ b/components/camel-aws/camel-aws2-sqs/src/test/java/org/apache/camel/component/aws2/sqs/SqsDoesNotExtendMessageVisibilityTest.java
@@ -59,7 +59,7 @@ public class SqsDoesNotExtendMessageVisibilityTest extends CamelTestSupport {
 
         // Wait for message to arrive.
         MockEndpoint.assertIsSatisfied(context);
-        assertEquals(0, this.client.changeMessageVisibilityRequests.size());
+        assertEquals(0, this.client.changeMessageVisibilityBatchRequests.size());
     }
 
     @Override