You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@camel.apache.org by "Daniel Carleton (Created) (JIRA)" <ji...@apache.org> on 2012/03/23 18:33:28 UTC

[jira] [Created] (CAMEL-5113) Parallel and fault tolerant message processing for SQS endpoints.

Parallel and fault tolerant message processing for SQS endpoints.
-----------------------------------------------------------------

                 Key: CAMEL-5113
                 URL: https://issues.apache.org/jira/browse/CAMEL-5113
             Project: Camel
          Issue Type: Improvement
          Components: camel-aws
            Reporter: Daniel Carleton


I'm using Camel to implement parallel processing of jobs in an SQS queue, and ran into a few issues with the current implementation of the SQS component:

# SqsConsumer uses a blocking/synchronous processor, which prevents parallel processing of multiple messages from a single endpoint.
# Having a maxMessagesPerPoll other than one doesn't seem to make sense, because any messages not being actively processed should be left back in the queue for other consumers to have a chance with.
# Rollback processing doesn't go back to SQS and set the visibility timeout to zero, which prevents immediate retries.  (I guess if deleteAfterRead is turned on this doesn't make sense, but then doesn't turning on deleteAfterRead break fault tolerance completely?)

I propose the following solutions to these problems:

# Use an asynchronous processor in SqsConsumer by way of getAsyncProcessor().  (Messages in SQS aren't guaranteed to be FIFO anyway, so there should be no issue with order of processing.)
# Replace maxMessagesPerPoll with maxInFlightMessages.  Put a semaphore in SqsConsumer to control the maximum number of in flight messages, and when polling SQS always set the number of available permits as the maximum number of messages to retrieve.
# In the onFailure callback for an exchange set the visibility timeout in SQS to zero via ChangeMessageVisibility.

How does this sound?  I'm working on a patch.  This is my first work on Camel, so if you see any problems with my approach let me know!

Thanks,

- Dan


--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Updated] (CAMEL-5113) Parallel and fault tolerant message processing for SQS endpoints.

Posted by "Daniel Carleton (Updated) (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/CAMEL-5113?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Daniel Carleton updated CAMEL-5113:
-----------------------------------

    Description: 
I'm using Camel to implement parallel processing of jobs in an SQS queue, and ran into a few issues with the current implementation of the SQS component:

# SqsConsumer uses a blocking/synchronous processor, which prevents parallel processing of multiple messages from a single endpoint.
# Having a maxMessagesPerPoll other than one doesn't seem to make sense, because any messages not being actively processed should be left back in the queue for other consumers to have a chance with.
# Rollback processing doesn't go back to SQS and set the visibility timeout to zero, which prevents immediate retries. 

I propose the following solutions to these problems:

# Use an asynchronous processor in SqsConsumer by way of getAsyncProcessor().  (Messages in SQS aren't guaranteed to be FIFO anyway, so there should be no issue with order of processing.)
# Replace maxMessagesPerPoll with maxInFlightMessages.  Put a semaphore in SqsConsumer to control the maximum number of in flight messages, and when polling SQS always set the number of available permits as the maximum number of messages to retrieve.
# In the onFailure callback for an exchange set the visibility timeout in SQS to zero via ChangeMessageVisibility.

How does this sound?  I'm working on a patch.  This is my first work on Camel, so if you see any problems with my approach let me know!

Thanks,

- Dan


  was:
I'm using Camel to implement parallel processing of jobs in an SQS queue, and ran into a few issues with the current implementation of the SQS component:

# SqsConsumer uses a blocking/synchronous processor, which prevents parallel processing of multiple messages from a single endpoint.
# Having a maxMessagesPerPoll other than one doesn't seem to make sense, because any messages not being actively processed should be left back in the queue for other consumers to have a chance with.
# Rollback processing doesn't go back to SQS and set the visibility timeout to zero, which prevents immediate retries.  (I guess if deleteAfterRead is turned on this doesn't make sense, but then doesn't turning on deleteAfterRead break fault tolerance completely?)

I propose the following solutions to these problems:

# Use an asynchronous processor in SqsConsumer by way of getAsyncProcessor().  (Messages in SQS aren't guaranteed to be FIFO anyway, so there should be no issue with order of processing.)
# Replace maxMessagesPerPoll with maxInFlightMessages.  Put a semaphore in SqsConsumer to control the maximum number of in flight messages, and when polling SQS always set the number of available permits as the maximum number of messages to retrieve.
# In the onFailure callback for an exchange set the visibility timeout in SQS to zero via ChangeMessageVisibility.

How does this sound?  I'm working on a patch.  This is my first work on Camel, so if you see any problems with my approach let me know!

Thanks,

- Dan


    
> Parallel and fault tolerant message processing for SQS endpoints.
> -----------------------------------------------------------------
>
>                 Key: CAMEL-5113
>                 URL: https://issues.apache.org/jira/browse/CAMEL-5113
>             Project: Camel
>          Issue Type: Improvement
>          Components: camel-aws
>            Reporter: Daniel Carleton
>
> I'm using Camel to implement parallel processing of jobs in an SQS queue, and ran into a few issues with the current implementation of the SQS component:
> # SqsConsumer uses a blocking/synchronous processor, which prevents parallel processing of multiple messages from a single endpoint.
> # Having a maxMessagesPerPoll other than one doesn't seem to make sense, because any messages not being actively processed should be left back in the queue for other consumers to have a chance with.
> # Rollback processing doesn't go back to SQS and set the visibility timeout to zero, which prevents immediate retries. 
> I propose the following solutions to these problems:
> # Use an asynchronous processor in SqsConsumer by way of getAsyncProcessor().  (Messages in SQS aren't guaranteed to be FIFO anyway, so there should be no issue with order of processing.)
> # Replace maxMessagesPerPoll with maxInFlightMessages.  Put a semaphore in SqsConsumer to control the maximum number of in flight messages, and when polling SQS always set the number of available permits as the maximum number of messages to retrieve.
> # In the onFailure callback for an exchange set the visibility timeout in SQS to zero via ChangeMessageVisibility.
> How does this sound?  I'm working on a patch.  This is my first work on Camel, so if you see any problems with my approach let me know!
> Thanks,
> - Dan

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Commented] (CAMEL-5113) Parallel and fault tolerant message processing for SQS endpoints.

Posted by "Claus Ibsen (Commented) (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/CAMEL-5113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13237865#comment-13237865 ] 

Claus Ibsen commented on CAMEL-5113:
------------------------------------

You can read about the throttle route policy here
http://camel.apache.org/routepolicy

And btw I also wonder a bit why some of the AWS component was created as a BatchConsumer. Maybe it was a copy/paste from another component. Or it was on purpose.
http://camel.apache.org/batch-consumer.html
                
> Parallel and fault tolerant message processing for SQS endpoints.
> -----------------------------------------------------------------
>
>                 Key: CAMEL-5113
>                 URL: https://issues.apache.org/jira/browse/CAMEL-5113
>             Project: Camel
>          Issue Type: Improvement
>          Components: camel-aws
>            Reporter: Daniel Carleton
>
> I'm using Camel to implement parallel processing of jobs in an SQS queue, and ran into a few issues with the current implementation of the SQS component:
> # SqsConsumer uses a blocking/synchronous processor, which prevents parallel processing of multiple messages from a single endpoint.
> # Having a maxMessagesPerPoll other than one doesn't seem to make sense, because any messages not being actively processed should be left back in the queue for other consumers to have a chance with.
> # Rollback processing doesn't go back to SQS and set the visibility timeout to zero, which prevents immediate retries. 
> I propose the following solutions to these problems:
> # Use an asynchronous processor in SqsConsumer by way of getAsyncProcessor().  (Messages in SQS aren't guaranteed to be FIFO anyway, so there should be no issue with order of processing.)
> # Replace maxMessagesPerPoll with maxInFlightMessages.  Put a semaphore in SqsConsumer to control the maximum number of in flight messages, and when polling SQS always set the number of available permits as the maximum number of messages to retrieve.
> # In the onFailure callback for an exchange set the visibility timeout in SQS to zero via ChangeMessageVisibility.
> How does this sound?  I'm working on a patch.  This is my first work on Camel, so if you see any problems with my approach let me know!
> Thanks,
> - Dan

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Commented] (CAMEL-5113) Parallel and fault tolerant message processing for SQS endpoints.

Posted by "Claus Ibsen (Commented) (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/CAMEL-5113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13237835#comment-13237835 ] 

Claus Ibsen commented on CAMEL-5113:
------------------------------------

Ad 1)
+1
This is fine

Ad 2)
-1
No leave it as is, this is how it works with batch consumers in Camel

Ad 3)
+1
Yeah if some rollback is possible, then that is fine
                
> Parallel and fault tolerant message processing for SQS endpoints.
> -----------------------------------------------------------------
>
>                 Key: CAMEL-5113
>                 URL: https://issues.apache.org/jira/browse/CAMEL-5113
>             Project: Camel
>          Issue Type: Improvement
>          Components: camel-aws
>            Reporter: Daniel Carleton
>
> I'm using Camel to implement parallel processing of jobs in an SQS queue, and ran into a few issues with the current implementation of the SQS component:
> # SqsConsumer uses a blocking/synchronous processor, which prevents parallel processing of multiple messages from a single endpoint.
> # Having a maxMessagesPerPoll other than one doesn't seem to make sense, because any messages not being actively processed should be left back in the queue for other consumers to have a chance with.
> # Rollback processing doesn't go back to SQS and set the visibility timeout to zero, which prevents immediate retries. 
> I propose the following solutions to these problems:
> # Use an asynchronous processor in SqsConsumer by way of getAsyncProcessor().  (Messages in SQS aren't guaranteed to be FIFO anyway, so there should be no issue with order of processing.)
> # Replace maxMessagesPerPoll with maxInFlightMessages.  Put a semaphore in SqsConsumer to control the maximum number of in flight messages, and when polling SQS always set the number of available permits as the maximum number of messages to retrieve.
> # In the onFailure callback for an exchange set the visibility timeout in SQS to zero via ChangeMessageVisibility.
> How does this sound?  I'm working on a patch.  This is my first work on Camel, so if you see any problems with my approach let me know!
> Thanks,
> - Dan

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Commented] (CAMEL-5113) Parallel and fault tolerant message processing for SQS endpoints.

Posted by "Claus Ibsen (Commented) (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/CAMEL-5113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13237836#comment-13237836 ] 

Claus Ibsen commented on CAMEL-5113:
------------------------------------

Ad 2)
The default of maxMessagesPerPoll should likely be changed from 10 to 1 as you say. But do not introduce more complexity with in flight semaphores and whatnot. People can use the throttler inflight policy for this kind of behavior.

                
> Parallel and fault tolerant message processing for SQS endpoints.
> -----------------------------------------------------------------
>
>                 Key: CAMEL-5113
>                 URL: https://issues.apache.org/jira/browse/CAMEL-5113
>             Project: Camel
>          Issue Type: Improvement
>          Components: camel-aws
>            Reporter: Daniel Carleton
>
> I'm using Camel to implement parallel processing of jobs in an SQS queue, and ran into a few issues with the current implementation of the SQS component:
> # SqsConsumer uses a blocking/synchronous processor, which prevents parallel processing of multiple messages from a single endpoint.
> # Having a maxMessagesPerPoll other than one doesn't seem to make sense, because any messages not being actively processed should be left back in the queue for other consumers to have a chance with.
> # Rollback processing doesn't go back to SQS and set the visibility timeout to zero, which prevents immediate retries. 
> I propose the following solutions to these problems:
> # Use an asynchronous processor in SqsConsumer by way of getAsyncProcessor().  (Messages in SQS aren't guaranteed to be FIFO anyway, so there should be no issue with order of processing.)
> # Replace maxMessagesPerPoll with maxInFlightMessages.  Put a semaphore in SqsConsumer to control the maximum number of in flight messages, and when polling SQS always set the number of available permits as the maximum number of messages to retrieve.
> # In the onFailure callback for an exchange set the visibility timeout in SQS to zero via ChangeMessageVisibility.
> How does this sound?  I'm working on a patch.  This is my first work on Camel, so if you see any problems with my approach let me know!
> Thanks,
> - Dan

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Commented] (CAMEL-5113) Parallel and fault tolerant message processing for SQS endpoints.

Posted by "Daniel Carleton (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/CAMEL-5113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13260811#comment-13260811 ] 

Daniel Carleton commented on CAMEL-5113:
----------------------------------------

I ended up not needing concurrency for my project, so setting maxMessagesPerPoll=1 was sufficient for my purposes.  Others here are aware of the issue, though, and so hopefully at patch will result at some point!

                
> Parallel and fault tolerant message processing for SQS endpoints.
> -----------------------------------------------------------------
>
>                 Key: CAMEL-5113
>                 URL: https://issues.apache.org/jira/browse/CAMEL-5113
>             Project: Camel
>          Issue Type: Improvement
>          Components: camel-aws
>            Reporter: Daniel Carleton
>
> I'm using Camel to implement parallel processing of jobs in an SQS queue, and ran into a few issues with the current implementation of the SQS component:
> # SqsConsumer uses a blocking/synchronous processor, which prevents parallel processing of multiple messages from a single endpoint.
> # Having a maxMessagesPerPoll other than one doesn't seem to make sense, because any messages not being actively processed should be left back in the queue for other consumers to have a chance with.
> # Rollback processing doesn't go back to SQS and set the visibility timeout to zero, which prevents immediate retries. 
> I propose the following solutions to these problems:
> # Use an asynchronous processor in SqsConsumer by way of getAsyncProcessor().  (Messages in SQS aren't guaranteed to be FIFO anyway, so there should be no issue with order of processing.)
> # Replace maxMessagesPerPoll with maxInFlightMessages.  Put a semaphore in SqsConsumer to control the maximum number of in flight messages, and when polling SQS always set the number of available permits as the maximum number of messages to retrieve.
> # In the onFailure callback for an exchange set the visibility timeout in SQS to zero via ChangeMessageVisibility.
> How does this sound?  I'm working on a patch.  This is my first work on Camel, so if you see any problems with my approach let me know!
> Thanks,
> - Dan

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Commented] (CAMEL-5113) Parallel and fault tolerant message processing for SQS endpoints.

Posted by "Claus Ibsen (Commented) (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/CAMEL-5113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13243137#comment-13243137 ] 

Claus Ibsen commented on CAMEL-5113:
------------------------------------

Daniel, yeah that will be good. Looking forward for the patche(s).
                
> Parallel and fault tolerant message processing for SQS endpoints.
> -----------------------------------------------------------------
>
>                 Key: CAMEL-5113
>                 URL: https://issues.apache.org/jira/browse/CAMEL-5113
>             Project: Camel
>          Issue Type: Improvement
>          Components: camel-aws
>            Reporter: Daniel Carleton
>
> I'm using Camel to implement parallel processing of jobs in an SQS queue, and ran into a few issues with the current implementation of the SQS component:
> # SqsConsumer uses a blocking/synchronous processor, which prevents parallel processing of multiple messages from a single endpoint.
> # Having a maxMessagesPerPoll other than one doesn't seem to make sense, because any messages not being actively processed should be left back in the queue for other consumers to have a chance with.
> # Rollback processing doesn't go back to SQS and set the visibility timeout to zero, which prevents immediate retries. 
> I propose the following solutions to these problems:
> # Use an asynchronous processor in SqsConsumer by way of getAsyncProcessor().  (Messages in SQS aren't guaranteed to be FIFO anyway, so there should be no issue with order of processing.)
> # Replace maxMessagesPerPoll with maxInFlightMessages.  Put a semaphore in SqsConsumer to control the maximum number of in flight messages, and when polling SQS always set the number of available permits as the maximum number of messages to retrieve.
> # In the onFailure callback for an exchange set the visibility timeout in SQS to zero via ChangeMessageVisibility.
> How does this sound?  I'm working on a patch.  This is my first work on Camel, so if you see any problems with my approach let me know!
> Thanks,
> - Dan

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Commented] (CAMEL-5113) Parallel and fault tolerant message processing for SQS endpoints.

Posted by "Daniel Carleton (Commented) (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/CAMEL-5113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13238728#comment-13238728 ] 

Daniel Carleton commented on CAMEL-5113:
----------------------------------------

Thanks for the feedback, Claus.

Ok, sounds like I should apply #1 and #3, change the default for maxMessagesPerPoll to 1, and use ThrottlingInflightRoutePolicy to control concurrency.

SQS does support grabbing multiple messages per request, so I suppose a BatchConsumer makes some sense.  If there's only a single Camel context consuming from the SQS queue, then it doesn't make a difference.  However if you have multiple processes polling the SQS queue, then each one should only reserve as many messages as it can process at a time.  Otherwise messages will get queued and processing latency will increase.

                
> Parallel and fault tolerant message processing for SQS endpoints.
> -----------------------------------------------------------------
>
>                 Key: CAMEL-5113
>                 URL: https://issues.apache.org/jira/browse/CAMEL-5113
>             Project: Camel
>          Issue Type: Improvement
>          Components: camel-aws
>            Reporter: Daniel Carleton
>
> I'm using Camel to implement parallel processing of jobs in an SQS queue, and ran into a few issues with the current implementation of the SQS component:
> # SqsConsumer uses a blocking/synchronous processor, which prevents parallel processing of multiple messages from a single endpoint.
> # Having a maxMessagesPerPoll other than one doesn't seem to make sense, because any messages not being actively processed should be left back in the queue for other consumers to have a chance with.
> # Rollback processing doesn't go back to SQS and set the visibility timeout to zero, which prevents immediate retries. 
> I propose the following solutions to these problems:
> # Use an asynchronous processor in SqsConsumer by way of getAsyncProcessor().  (Messages in SQS aren't guaranteed to be FIFO anyway, so there should be no issue with order of processing.)
> # Replace maxMessagesPerPoll with maxInFlightMessages.  Put a semaphore in SqsConsumer to control the maximum number of in flight messages, and when polling SQS always set the number of available permits as the maximum number of messages to retrieve.
> # In the onFailure callback for an exchange set the visibility timeout in SQS to zero via ChangeMessageVisibility.
> How does this sound?  I'm working on a patch.  This is my first work on Camel, so if you see any problems with my approach let me know!
> Thanks,
> - Dan

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Commented] (CAMEL-5113) Parallel and fault tolerant message processing for SQS endpoints.

Posted by "Daniel Carleton (Commented) (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/CAMEL-5113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13236837#comment-13236837 ] 

Daniel Carleton commented on CAMEL-5113:
----------------------------------------

Related thread on mailing list:

http://camel.465427.n5.nabble.com/Better-Way-to-Achieve-Parallel-Processing-of-SQS-Messages-td5578135.html

                
> Parallel and fault tolerant message processing for SQS endpoints.
> -----------------------------------------------------------------
>
>                 Key: CAMEL-5113
>                 URL: https://issues.apache.org/jira/browse/CAMEL-5113
>             Project: Camel
>          Issue Type: Improvement
>          Components: camel-aws
>            Reporter: Daniel Carleton
>
> I'm using Camel to implement parallel processing of jobs in an SQS queue, and ran into a few issues with the current implementation of the SQS component:
> # SqsConsumer uses a blocking/synchronous processor, which prevents parallel processing of multiple messages from a single endpoint.
> # Having a maxMessagesPerPoll other than one doesn't seem to make sense, because any messages not being actively processed should be left back in the queue for other consumers to have a chance with.
> # Rollback processing doesn't go back to SQS and set the visibility timeout to zero, which prevents immediate retries. 
> I propose the following solutions to these problems:
> # Use an asynchronous processor in SqsConsumer by way of getAsyncProcessor().  (Messages in SQS aren't guaranteed to be FIFO anyway, so there should be no issue with order of processing.)
> # Replace maxMessagesPerPoll with maxInFlightMessages.  Put a semaphore in SqsConsumer to control the maximum number of in flight messages, and when polling SQS always set the number of available permits as the maximum number of messages to retrieve.
> # In the onFailure callback for an exchange set the visibility timeout in SQS to zero via ChangeMessageVisibility.
> How does this sound?  I'm working on a patch.  This is my first work on Camel, so if you see any problems with my approach let me know!
> Thanks,
> - Dan

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira