You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2023/08/17 12:59:27 UTC
[camel] branch main updated: CAMEL-19656 batch visibility extender task (#11102)
This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new 9a29ad738c5 CAMEL-19656 batch visibility extender task (#11102)
9a29ad738c5 is described below
commit 9a29ad738c5514d1ea9dc8975b5ab353333eac21
Author: Jono Morris <jo...@xtra.co.nz>
AuthorDate: Fri Aug 18 00:59:21 2023 +1200
CAMEL-19656 batch visibility extender task (#11102)
* CAMEL-19656 batch visibility extender task
* CAMEL-19656 run extender as single background task
* CAMEL-19656 new list instance for each request
* CAMEL-19656 remove exchange from extender when complete
* CAMEL-19656 update logging collection reference
---
.../camel/component/aws2/sqs/Sqs2Consumer.java | 155 +++++++++++++--------
.../component/aws2/sqs/AmazonSQSClientMock.java | 16 +--
.../SqsConsumerExtendMessageVisibilityTest.java | 4 +-
.../sqs/SqsDoesNotExtendMessageVisibilityTest.java | 2 +-
4 files changed, 105 insertions(+), 72 deletions(-)
diff --git a/components/camel-aws/camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2Consumer.java b/components/camel-aws/camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2Consumer.java
index 4515323cca3..ee037337e5e 100644
--- a/components/camel-aws/camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2Consumer.java
+++ b/components/camel-aws/camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2Consumer.java
@@ -23,6 +23,7 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
@@ -47,7 +48,8 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.awscore.exception.AwsServiceException;
import software.amazon.awssdk.services.sqs.SqsClient;
-import software.amazon.awssdk.services.sqs.model.ChangeMessageVisibilityRequest;
+import software.amazon.awssdk.services.sqs.model.ChangeMessageVisibilityBatchRequest;
+import software.amazon.awssdk.services.sqs.model.ChangeMessageVisibilityBatchRequestEntry;
import software.amazon.awssdk.services.sqs.model.DeleteMessageRequest;
import software.amazon.awssdk.services.sqs.model.MessageAttributeValue;
import software.amazon.awssdk.services.sqs.model.MessageNotInflightException;
@@ -66,6 +68,8 @@ public class Sqs2Consumer extends ScheduledBatchPollingConsumer {
private static final Logger LOG = LoggerFactory.getLogger(Sqs2Consumer.class);
+ private TimeoutExtender timeoutExtender;
+ private ScheduledFuture<?> scheduledFuture;
private ScheduledExecutorService scheduledExecutor;
private transient String sqsConsumerToString;
private Collection<String> attributeNames;
@@ -169,44 +173,8 @@ public class Sqs2Consumer extends ScheduledBatchPollingConsumer {
// update pending number of exchanges
pendingExchanges = total - index - 1;
- // schedule task to extend visibility if enabled
- Integer visibilityTimeout = getConfiguration().getVisibilityTimeout();
- if (this.scheduledExecutor != null && visibilityTimeout != null && (visibilityTimeout.intValue() / 2) > 0) {
- int delay = visibilityTimeout.intValue() / 2;
- int period = visibilityTimeout.intValue();
- int repeatSeconds = (int) (visibilityTimeout.doubleValue() * 1.5);
- if (LOG.isDebugEnabled()) {
- LOG.debug(
- "Scheduled TimeoutExtender task to start after {} delay, and run with {}/{} period/repeat (seconds), to extend exchangeId: {}",
- delay, period,
- repeatSeconds, exchange.getExchangeId());
- }
- final TimeoutExtender extender = new TimeoutExtender(exchange, repeatSeconds);
- final ScheduledFuture<?> scheduledFuture
- = this.scheduledExecutor.scheduleAtFixedRate(extender, delay, period, TimeUnit.SECONDS);
- exchange.getExchangeExtension().addOnCompletion(new Synchronization() {
- @Override
- public void onComplete(Exchange exchange) {
- cancelExtender(exchange);
- }
-
- @Override
- public void onFailure(Exchange exchange) {
- cancelExtender(exchange);
- }
-
- private void cancelExtender(Exchange exchange) {
- // cancel task as we are done
- LOG.trace("Processing done so cancelling TimeoutExtender task for exchangeId: {}",
- exchange.getExchangeId());
- extender.cancel();
- boolean cancelled = scheduledFuture.cancel(true);
- if (!cancelled) {
- LOG.warn("TimeoutExtender task for exchangeId: {} could not be cancelled",
- exchange.getExchangeId());
- }
- }
- });
+ if (this.timeoutExtender != null) {
+ timeoutExtender.add(exchange);
}
// add on completion to handle after work when the exchange is done
@@ -370,6 +338,22 @@ public class Sqs2Consumer extends ScheduledBatchPollingConsumer {
this.scheduledExecutor = getEndpoint().getCamelContext().getExecutorServiceManager().newScheduledThreadPool(this,
"SqsTimeoutExtender", profile);
+
+ Integer visibilityTimeout = getConfiguration().getVisibilityTimeout();
+
+ if (visibilityTimeout != null && visibilityTimeout > 0) {
+ int delay = visibilityTimeout;
+ int repeatSeconds = (int) (visibilityTimeout.doubleValue() * 1.5);
+ this.timeoutExtender = new TimeoutExtender(repeatSeconds);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ "Scheduled TimeoutExtender task to start after {} delay, and run with {}/{} period/repeat (seconds)",
+ delay, delay, repeatSeconds);
+ }
+ this.scheduledFuture
+ = scheduledExecutor.scheduleAtFixedRate(this.timeoutExtender, delay, delay, TimeUnit.SECONDS);
+ }
}
super.doStart();
@@ -377,6 +361,16 @@ public class Sqs2Consumer extends ScheduledBatchPollingConsumer {
@Override
protected void doShutdown() throws Exception {
+ if (timeoutExtender != null) {
+ timeoutExtender.cancel();
+ timeoutExtender = null;
+ }
+
+ if (scheduledFuture != null) {
+ scheduledFuture.cancel(true);
+ scheduledFuture = null;
+ }
+
if (scheduledExecutor != null) {
getEndpoint().getCamelContext().getExecutorServiceManager().shutdownNow(scheduledExecutor);
scheduledExecutor = null;
@@ -387,15 +381,43 @@ public class Sqs2Consumer extends ScheduledBatchPollingConsumer {
private class TimeoutExtender implements Runnable {
- private final Exchange exchange;
+ private static final int MAX_REQUESTS = 10;
private final int repeatSeconds;
private final AtomicBoolean run = new AtomicBoolean(true);
+ private final Map<String, ChangeMessageVisibilityBatchRequestEntry> entries = new ConcurrentHashMap<>();
- TimeoutExtender(Exchange exchange, int repeatSeconds) {
- this.exchange = exchange;
+ TimeoutExtender(int repeatSeconds) {
this.repeatSeconds = repeatSeconds;
}
+ public void add(Exchange exchange) {
+ exchange.getExchangeExtension().addOnCompletion(new Synchronization() {
+ @Override
+ public void onComplete(Exchange exchange) {
+ remove(exchange);
+ }
+
+ @Override
+ public void onFailure(Exchange exchange) {
+ remove(exchange);
+ }
+
+ private void remove(Exchange exchange) {
+ LOG.trace("Removing exchangeId {} from the TimeoutExtender, processing done",
+ exchange.getExchangeId());
+ entries.remove(exchange.getExchangeId());
+ }
+ });
+
+ ChangeMessageVisibilityBatchRequestEntry entry
+ = ChangeMessageVisibilityBatchRequestEntry.builder()
+ .id(exchange.getExchangeId()).visibilityTimeout(repeatSeconds)
+ .receiptHandle(exchange.getIn().getHeader(Sqs2Constants.RECEIPT_HANDLE, String.class))
+ .build();
+
+ entries.put(exchange.getExchangeId(), entry);
+ }
+
public void cancel() {
// cancel by setting to no longer run
run.set(false);
@@ -404,32 +426,43 @@ public class Sqs2Consumer extends ScheduledBatchPollingConsumer {
@Override
public void run() {
if (run.get()) {
- ChangeMessageVisibilityRequest.Builder request
- = ChangeMessageVisibilityRequest.builder().queueUrl(getQueueUrl()).visibilityTimeout(repeatSeconds)
- .receiptHandle(exchange.getIn().getHeader(Sqs2Constants.RECEIPT_HANDLE, String.class));
-
- try {
- LOG.trace("Extending visibility window by {} seconds for exchange {}", this.repeatSeconds, this.exchange);
- getEndpoint().getClient().changeMessageVisibility(request.build());
- LOG.debug("Extended visibility window by {} seconds for exchange {}", this.repeatSeconds, this.exchange);
- } catch (MessageNotInflightException | ReceiptHandleIsInvalidException e) {
- // Ignore.
- } catch (SqsException e) {
- if (e.getMessage().contains("Message does not exist or is not available for visibility timeout change")) {
+ Queue<ChangeMessageVisibilityBatchRequestEntry> entryQueue = new LinkedList<>(entries.values());
+
+ while (!entryQueue.isEmpty()) {
+ List<ChangeMessageVisibilityBatchRequestEntry> batchEntries = new LinkedList<>();
+ // up to 10 requests can be sent with each ChangeMessageVisibilityBatch action
+ while (!entryQueue.isEmpty() && batchEntries.size() < MAX_REQUESTS) {
+ batchEntries.add(entryQueue.poll());
+ }
+
+ ChangeMessageVisibilityBatchRequest request
+ = ChangeMessageVisibilityBatchRequest.builder().queueUrl(getQueueUrl()).entries(batchEntries)
+ .build();
+
+ try {
+ LOG.trace("Extending visibility window by {} seconds for request entries {}", repeatSeconds, batchEntries);
+ getEndpoint().getClient().changeMessageVisibilityBatch(request);
+ LOG.debug("Extended visibility window for request entries {}", batchEntries);
+ } catch (MessageNotInflightException | ReceiptHandleIsInvalidException e) {
// Ignore.
- } else {
- logException(e);
+ } catch (SqsException e) {
+ if (e.getMessage()
+ .contains("Message does not exist or is not available for visibility timeout change")) {
+ // Ignore.
+ } else {
+ logException(e, batchEntries);
+ }
+ } catch (Exception e) {
+ logException(e, batchEntries);
}
- } catch (Exception e) {
- logException(e);
}
}
}
- private void logException(Exception e) {
- LOG.warn("Extending visibility window failed for exchange {}"
+ private void logException(Exception e, List<ChangeMessageVisibilityBatchRequestEntry> entries) {
+ LOG.warn("Extending visibility window failed for entries {}"
+ ". Will not attempt to extend visibility further. This exception will be ignored.",
- exchange, e);
+ entries, e);
}
}
diff --git a/components/camel-aws/camel-aws2-sqs/src/test/java/org/apache/camel/component/aws2/sqs/AmazonSQSClientMock.java b/components/camel-aws/camel-aws2-sqs/src/test/java/org/apache/camel/component/aws2/sqs/AmazonSQSClientMock.java
index 3d68ed19002..7d3affb8cf7 100644
--- a/components/camel-aws/camel-aws2-sqs/src/test/java/org/apache/camel/component/aws2/sqs/AmazonSQSClientMock.java
+++ b/components/camel-aws/camel-aws2-sqs/src/test/java/org/apache/camel/component/aws2/sqs/AmazonSQSClientMock.java
@@ -33,8 +33,8 @@ import software.amazon.awssdk.core.exception.SdkClientException;
import software.amazon.awssdk.services.sqs.SqsClient;
import software.amazon.awssdk.services.sqs.SqsServiceClientConfiguration;
import software.amazon.awssdk.services.sqs.model.BatchResultErrorEntry;
-import software.amazon.awssdk.services.sqs.model.ChangeMessageVisibilityRequest;
-import software.amazon.awssdk.services.sqs.model.ChangeMessageVisibilityResponse;
+import software.amazon.awssdk.services.sqs.model.ChangeMessageVisibilityBatchRequest;
+import software.amazon.awssdk.services.sqs.model.ChangeMessageVisibilityBatchResponse;
import software.amazon.awssdk.services.sqs.model.CreateQueueRequest;
import software.amazon.awssdk.services.sqs.model.CreateQueueResponse;
import software.amazon.awssdk.services.sqs.model.DeleteMessageRequest;
@@ -63,7 +63,7 @@ public class AmazonSQSClientMock implements SqsClient {
List<Message> messages = new ArrayList<>();
Map<String, Map<String, String>> queueAttributes = new HashMap<>();
- List<ChangeMessageVisibilityRequest> changeMessageVisibilityRequests = new CopyOnWriteArrayList<>();
+ List<ChangeMessageVisibilityBatchRequest> changeMessageVisibilityBatchRequests = new CopyOnWriteArrayList<>();
private Map<String, CreateQueueRequest> queues = new LinkedHashMap<>();
private Map<String, ScheduledFuture<?>> inFlight = new LinkedHashMap<>();
private ScheduledExecutorService scheduler;
@@ -232,12 +232,12 @@ public class AmazonSQSClientMock implements SqsClient {
public SqsServiceClientConfiguration serviceClientConfiguration() {
return null;
}
-
+
@Override
- public ChangeMessageVisibilityResponse changeMessageVisibility(
- ChangeMessageVisibilityRequest changeMessageVisibilityRequest) {
- this.changeMessageVisibilityRequests.add(changeMessageVisibilityRequest);
- return ChangeMessageVisibilityResponse.builder().build();
+ public ChangeMessageVisibilityBatchResponse changeMessageVisibilityBatch(
+ ChangeMessageVisibilityBatchRequest changeMessageVisibilityBatchRequest) {
+ this.changeMessageVisibilityBatchRequests.add(changeMessageVisibilityBatchRequest);
+ return ChangeMessageVisibilityBatchResponse.builder().build();
}
@Override
diff --git a/components/camel-aws/camel-aws2-sqs/src/test/java/org/apache/camel/component/aws2/sqs/SqsConsumerExtendMessageVisibilityTest.java b/components/camel-aws/camel-aws2-sqs/src/test/java/org/apache/camel/component/aws2/sqs/SqsConsumerExtendMessageVisibilityTest.java
index e972cd3524c..1ecb28ae27d 100644
--- a/components/camel-aws/camel-aws2-sqs/src/test/java/org/apache/camel/component/aws2/sqs/SqsConsumerExtendMessageVisibilityTest.java
+++ b/components/camel-aws/camel-aws2-sqs/src/test/java/org/apache/camel/component/aws2/sqs/SqsConsumerExtendMessageVisibilityTest.java
@@ -60,8 +60,8 @@ public class SqsConsumerExtendMessageVisibilityTest extends CamelTestSupport {
// Wait for message to arrive.
MockEndpoint.assertIsSatisfied(context);
- assertTrue(this.client.changeMessageVisibilityRequests.size() >= 1);
- assertTrue(this.client.changeMessageVisibilityRequests.size() <= 3);
+ assertTrue(this.client.changeMessageVisibilityBatchRequests.size() >= 1);
+ assertTrue(this.client.changeMessageVisibilityBatchRequests.size() <= 3);
}
@Override
diff --git a/components/camel-aws/camel-aws2-sqs/src/test/java/org/apache/camel/component/aws2/sqs/SqsDoesNotExtendMessageVisibilityTest.java b/components/camel-aws/camel-aws2-sqs/src/test/java/org/apache/camel/component/aws2/sqs/SqsDoesNotExtendMessageVisibilityTest.java
index 1b5414bfa95..83b7b61f9d6 100644
--- a/components/camel-aws/camel-aws2-sqs/src/test/java/org/apache/camel/component/aws2/sqs/SqsDoesNotExtendMessageVisibilityTest.java
+++ b/components/camel-aws/camel-aws2-sqs/src/test/java/org/apache/camel/component/aws2/sqs/SqsDoesNotExtendMessageVisibilityTest.java
@@ -59,7 +59,7 @@ public class SqsDoesNotExtendMessageVisibilityTest extends CamelTestSupport {
// Wait for message to arrive.
MockEndpoint.assertIsSatisfied(context);
- assertEquals(0, this.client.changeMessageVisibilityRequests.size());
+ assertEquals(0, this.client.changeMessageVisibilityBatchRequests.size());
}
@Override