You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by "jono-morris (via GitHub)" <gi...@apache.org> on 2023/08/14 12:30:24 UTC

[GitHub] [camel] jono-morris opened a new pull request, #11102: CAMEL-19656 batch visibility extender task

jono-morris opened a new pull request, #11102:
URL: https://github.com/apache/camel/pull/11102

   
   Processes visibility extension entries of all exchanges submitted to the consumer using ChangeMessageVisibilityBatchRequest run as a single submission to changeMessageVisibilityBatch on the Amazon SqsClient. 
   
   Added TimeoutExtender class that holds the extension entries and runs/shuts-down the TimeoutTask.
   
   ChangeMessageVisibilityBatchRequest sample code from amazon:  https://github.com/awsdocs/aws-doc-sdk-examples/blob/main/javav2/example_code/sqs/src/main/java/com/example/sqs/VisibilityTimeout.java#L102


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@camel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [camel] davsclaus commented on pull request #11102: CAMEL-19656 batch visibility extender task

Posted by "davsclaus (via GitHub)" <gi...@apache.org>.
davsclaus commented on PR #11102:
URL: https://github.com/apache/camel/pull/11102#issuecomment-1677334303

   Thanks for helping with this. But I would like to do this in a more stable way without all the complexity of schedule/cancel a task.
   
   Instead we just have a single background task that runs periodically every X interval. It has an internal queue with the current exchanges that are inflight. Then if the queue is empty it does nothing. Otherwise it sends a batch request to AWS.
   
   Also how how long time to extend can just be a fixed amount, as the task is scheduled to run also in fixed interval.
   Then we can have 2 options the end user can control.
   
   - delay between extend task runs
   - how long time to extend
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@camel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [camel] simon-ras commented on a diff in pull request #11102: CAMEL-19656 batch visibility extender task

Posted by "simon-ras (via GitHub)" <gi...@apache.org>.
simon-ras commented on code in PR #11102:
URL: https://github.com/apache/camel/pull/11102#discussion_r1294582716


##########
components/camel-aws/camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2Consumer.java:
##########
@@ -404,32 +410,44 @@ public void cancel() {
         @Override
         public void run() {
             if (run.get()) {
-                ChangeMessageVisibilityRequest.Builder request
-                        = ChangeMessageVisibilityRequest.builder().queueUrl(getQueueUrl()).visibilityTimeout(repeatSeconds)
-                                .receiptHandle(exchange.getIn().getHeader(Sqs2Constants.RECEIPT_HANDLE, String.class));
-
-                try {
-                    LOG.trace("Extending visibility window by {} seconds for exchange {}", this.repeatSeconds, this.exchange);
-                    getEndpoint().getClient().changeMessageVisibility(request.build());
-                    LOG.debug("Extended visibility window by {} seconds for exchange {}", this.repeatSeconds, this.exchange);
-                } catch (MessageNotInflightException | ReceiptHandleIsInvalidException e) {
-                    // Ignore.
-                } catch (SqsException e) {
-                    if (e.getMessage().contains("Message does not exist or is not available for visibility timeout change")) {
+
+                List<ChangeMessageVisibilityBatchRequestEntry> entries = new LinkedList<>();
+
+                while (!requestQueue.isEmpty()) {
+
+                    // up to 10 requests can be sent with each ChangeMessageVisibilityBatch action
+                    while (!requestQueue.isEmpty() && entries.size() < MAX_REQUESTS) {
+                        entries.add(requestQueue.poll());

Review Comment:
   Well, I appreciate the improvements :tophat: 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@camel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [camel] jono-morris commented on a diff in pull request #11102: CAMEL-19656 batch visibility extender task

Posted by "jono-morris (via GitHub)" <gi...@apache.org>.
jono-morris commented on code in PR #11102:
URL: https://github.com/apache/camel/pull/11102#discussion_r1294571928


##########
components/camel-aws/camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2Consumer.java:
##########
@@ -404,32 +410,44 @@ public void cancel() {
         @Override
         public void run() {
             if (run.get()) {
-                ChangeMessageVisibilityRequest.Builder request
-                        = ChangeMessageVisibilityRequest.builder().queueUrl(getQueueUrl()).visibilityTimeout(repeatSeconds)
-                                .receiptHandle(exchange.getIn().getHeader(Sqs2Constants.RECEIPT_HANDLE, String.class));
-
-                try {
-                    LOG.trace("Extending visibility window by {} seconds for exchange {}", this.repeatSeconds, this.exchange);
-                    getEndpoint().getClient().changeMessageVisibility(request.build());
-                    LOG.debug("Extended visibility window by {} seconds for exchange {}", this.repeatSeconds, this.exchange);
-                } catch (MessageNotInflightException | ReceiptHandleIsInvalidException e) {
-                    // Ignore.
-                } catch (SqsException e) {
-                    if (e.getMessage().contains("Message does not exist or is not available for visibility timeout change")) {
+
+                List<ChangeMessageVisibilityBatchRequestEntry> entries = new LinkedList<>();
+
+                while (!requestQueue.isEmpty()) {
+
+                    // up to 10 requests can be sent with each ChangeMessageVisibilityBatch action
+                    while (!requestQueue.isEmpty() && entries.size() < MAX_REQUESTS) {
+                        entries.add(requestQueue.poll());

Review Comment:
   Thank you that makes sense, I really appreciate the comments.  I'll make amendments tomorrow.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@camel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [camel] davsclaus merged pull request #11102: CAMEL-19656 batch visibility extender task

Posted by "davsclaus (via GitHub)" <gi...@apache.org>.
davsclaus merged PR #11102:
URL: https://github.com/apache/camel/pull/11102


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@camel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [camel] simon-ras commented on a diff in pull request #11102: CAMEL-19656 batch visibility extender task

Posted by "simon-ras (via GitHub)" <gi...@apache.org>.
simon-ras commented on code in PR #11102:
URL: https://github.com/apache/camel/pull/11102#discussion_r1293470910


##########
components/camel-aws/camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2Consumer.java:
##########
@@ -404,14 +434,13 @@ public void cancel() {
         @Override
         public void run() {
             if (run.get()) {
-                ChangeMessageVisibilityRequest.Builder request
-                        = ChangeMessageVisibilityRequest.builder().queueUrl(getQueueUrl()).visibilityTimeout(repeatSeconds)
-                                .receiptHandle(exchange.getIn().getHeader(Sqs2Constants.RECEIPT_HANDLE, String.class));
+                ChangeMessageVisibilityBatchRequest.Builder request
+                        = ChangeMessageVisibilityBatchRequest.builder().queueUrl(getQueueUrl()).entries(entries);

Review Comment:
   Please be aware that the request can take at most 10 entries accordingly to the AWS documentation. `entries` at this point could be more than that.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@camel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [camel] davsclaus commented on a diff in pull request #11102: CAMEL-19656 batch visibility extender task

Posted by "davsclaus (via GitHub)" <gi...@apache.org>.
davsclaus commented on code in PR #11102:
URL: https://github.com/apache/camel/pull/11102#discussion_r1293472109


##########
components/camel-aws/camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2Consumer.java:
##########
@@ -404,14 +434,13 @@ public void cancel() {
         @Override
         public void run() {
             if (run.get()) {
-                ChangeMessageVisibilityRequest.Builder request
-                        = ChangeMessageVisibilityRequest.builder().queueUrl(getQueueUrl()).visibilityTimeout(repeatSeconds)
-                                .receiptHandle(exchange.getIn().getHeader(Sqs2Constants.RECEIPT_HANDLE, String.class));
+                ChangeMessageVisibilityBatchRequest.Builder request
+                        = ChangeMessageVisibilityBatchRequest.builder().queueUrl(getQueueUrl()).entries(entries);

Review Comment:
   Thanks then we should split the batch into group of 10, and then if there are 25, then we do 3 calls.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@camel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [camel] davsclaus commented on pull request #11102: CAMEL-19656 batch visibility extender task

Posted by "davsclaus (via GitHub)" <gi...@apache.org>.
davsclaus commented on PR #11102:
URL: https://github.com/apache/camel/pull/11102#issuecomment-1681639236

   @simon-ras do you have any feedback on the updated code ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@camel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [camel] simon-ras commented on a diff in pull request #11102: CAMEL-19656 batch visibility extender task

Posted by "simon-ras (via GitHub)" <gi...@apache.org>.
simon-ras commented on code in PR #11102:
URL: https://github.com/apache/camel/pull/11102#discussion_r1294523629


##########
components/camel-aws/camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2Consumer.java:
##########
@@ -404,32 +410,44 @@ public void cancel() {
         @Override
         public void run() {
             if (run.get()) {
-                ChangeMessageVisibilityRequest.Builder request
-                        = ChangeMessageVisibilityRequest.builder().queueUrl(getQueueUrl()).visibilityTimeout(repeatSeconds)
-                                .receiptHandle(exchange.getIn().getHeader(Sqs2Constants.RECEIPT_HANDLE, String.class));
-
-                try {
-                    LOG.trace("Extending visibility window by {} seconds for exchange {}", this.repeatSeconds, this.exchange);
-                    getEndpoint().getClient().changeMessageVisibility(request.build());
-                    LOG.debug("Extended visibility window by {} seconds for exchange {}", this.repeatSeconds, this.exchange);
-                } catch (MessageNotInflightException | ReceiptHandleIsInvalidException e) {
-                    // Ignore.
-                } catch (SqsException e) {
-                    if (e.getMessage().contains("Message does not exist or is not available for visibility timeout change")) {
+
+                List<ChangeMessageVisibilityBatchRequestEntry> entries = new LinkedList<>();
+
+                while (!requestQueue.isEmpty()) {
+
+                    // up to 10 requests can be sent with each ChangeMessageVisibilityBatch action
+                    while (!requestQueue.isEmpty() && entries.size() < MAX_REQUESTS) {
+                        entries.add(requestQueue.poll());

Review Comment:
   The entry should only be removed from this queue once it has been processed by the consumer.
   
   The current solution will only extend the message a single time. Also I cannot see where you remove the entry from the extend queue once it has been fully processed.
   
   I think it would be better if the consumer itself had a collection of in-flight exchanges - and then the extender would iterate that collection. The consumer then has to remove the exchanges once fully processed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@camel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [camel] github-actions[bot] commented on pull request #11102: CAMEL-19656 batch visibility extender task

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #11102:
URL: https://github.com/apache/camel/pull/11102#issuecomment-1677230211

   :star2: Thank you for your contribution to the Apache Camel project! :star2: 
   
   :camel: Maintainers, please note that first-time contributors *require manual approval* for the GitHub Actions to run.
   
   :warning: Please note that the changes on this PR may be **tested automatically** if they change components.
   
   :robot: Use the command `/component-test (camel-)component-name1 (camel-)component-name2..` to request a test from the test bot.
   
   If necessary Apache Camel Committers may access logs and test results in the job summaries!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@camel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [camel] davsclaus commented on a diff in pull request #11102: CAMEL-19656 batch visibility extender task

Posted by "davsclaus (via GitHub)" <gi...@apache.org>.
davsclaus commented on code in PR #11102:
URL: https://github.com/apache/camel/pull/11102#discussion_r1294540343


##########
components/camel-aws/camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2Consumer.java:
##########
@@ -404,32 +410,44 @@ public void cancel() {
         @Override
         public void run() {
             if (run.get()) {
-                ChangeMessageVisibilityRequest.Builder request
-                        = ChangeMessageVisibilityRequest.builder().queueUrl(getQueueUrl()).visibilityTimeout(repeatSeconds)
-                                .receiptHandle(exchange.getIn().getHeader(Sqs2Constants.RECEIPT_HANDLE, String.class));
-
-                try {
-                    LOG.trace("Extending visibility window by {} seconds for exchange {}", this.repeatSeconds, this.exchange);
-                    getEndpoint().getClient().changeMessageVisibility(request.build());
-                    LOG.debug("Extended visibility window by {} seconds for exchange {}", this.repeatSeconds, this.exchange);
-                } catch (MessageNotInflightException | ReceiptHandleIsInvalidException e) {
-                    // Ignore.
-                } catch (SqsException e) {
-                    if (e.getMessage().contains("Message does not exist or is not available for visibility timeout change")) {
+
+                List<ChangeMessageVisibilityBatchRequestEntry> entries = new LinkedList<>();
+
+                while (!requestQueue.isEmpty()) {
+
+                    // up to 10 requests can be sent with each ChangeMessageVisibilityBatch action
+                    while (!requestQueue.isEmpty() && entries.size() < MAX_REQUESTS) {
+                        entries.add(requestQueue.poll());

Review Comment:
   the UoW on the exchange should remove the exchange, then its done when the exchange is fully complete



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@camel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org