You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ni...@apache.org on 2014/01/06 10:25:25 UTC
[2/3] git commit: CAMEL-7105 Added ability to auto reconnect for sqs
queues with thanks to Adrian
CAMEL-7105 Added ability to auto reconnect for sqs queues with thanks to Adrian
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/a5fd4298
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/a5fd4298
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/a5fd4298
Branch: refs/heads/camel-2.12.x
Commit: a5fd429880e7d12444d01be6d22aeebdaa455418
Parents: 4293f06
Author: Willem Jiang <wi...@gmail.com>
Authored: Mon Jan 6 17:11:04 2014 +0800
Committer: Willem Jiang <wi...@gmail.com>
Committed: Mon Jan 6 17:25:02 2014 +0800
----------------------------------------------------------------------
.../camel/component/aws/sqs/SqsConsumer.java | 28 +++++++++++++++++++-
.../camel/component/aws/sqs/SqsEndpoint.java | 4 +--
2 files changed, 29 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/a5fd4298/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java
index 1163743..7daa8a8 100644
--- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java
@@ -29,6 +29,8 @@ import com.amazonaws.services.sqs.model.ChangeMessageVisibilityRequest;
import com.amazonaws.services.sqs.model.DeleteMessageRequest;
import com.amazonaws.services.sqs.model.Message;
import com.amazonaws.services.sqs.model.MessageNotInflightException;
+import com.amazonaws.services.sqs.model.QueueDeletedRecentlyException;
+import com.amazonaws.services.sqs.model.QueueDoesNotExistException;
import com.amazonaws.services.sqs.model.ReceiptHandleIsInvalidException;
import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
import com.amazonaws.services.sqs.model.ReceiveMessageResult;
@@ -46,6 +48,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
/**
* A Consumer of messages from the Amazon Web Service Simple Queue Service
* <a href="http://aws.amazon.com/sqs/">AWS SQS</a>
@@ -74,7 +77,14 @@ public class SqsConsumer extends ScheduledBatchPollingConsumer {
LOG.trace("Receiving messages with request [{}]...", request);
- ReceiveMessageResult messageResult = getClient().receiveMessage(request);
+ ReceiveMessageResult messageResult = null;
+ try {
+ messageResult = getClient().receiveMessage(request);
+ } catch (QueueDoesNotExistException e) {
+ LOG.info("Queue does not exist....recreating now...");
+ reConnectToQueue();
+ messageResult = getClient().receiveMessage(request);
+ }
if (LOG.isTraceEnabled()) {
LOG.trace("Received {} messages", messageResult.getMessages().size());
@@ -83,6 +93,22 @@ public class SqsConsumer extends ScheduledBatchPollingConsumer {
Queue<Exchange> exchanges = createExchanges(messageResult.getMessages());
return processBatch(CastUtils.cast(exchanges));
}
+
+ public void reConnectToQueue() {
+ try {
+ getEndpoint().createQueue(getClient());
+ } catch (QueueDeletedRecentlyException qdr) {
+ LOG.debug("Queue recently deleted, will retry in 30 seconds.");
+ try {
+ Thread.sleep(30000);
+ getEndpoint().createQueue(getClient());
+ } catch (Exception e) {
+ LOG.error("failed to retry queue connection.", e);
+ }
+ } catch (Exception e) {
+ LOG.error("Could not connect to queue in amazon.", e);
+ }
+ }
protected Queue<Exchange> createExchanges(List<Message> messages) {
if (LOG.isTraceEnabled()) {
http://git-wip-us.apache.org/repos/asf/camel/blob/a5fd4298/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsEndpoint.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsEndpoint.java
index ca3ff0a..aa01c72 100644
--- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsEndpoint.java
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsEndpoint.java
@@ -110,7 +110,7 @@ public class SqsEndpoint extends ScheduledPollEndpoint {
}
}
- private void createQueue(AmazonSQS client) {
+ protected void createQueue(AmazonSQS client) {
LOG.trace("Queue '{}' doesn't exist. Will create it...", configuration.getQueueName());
// creates a new queue, or returns the URL of an existing one
@@ -220,7 +220,7 @@ public class SqsEndpoint extends ScheduledPollEndpoint {
protected String getQueueUrl() {
return queueUrl;
}
-
+
public int getMaxMessagesPerPoll() {
return maxMessagesPerPoll;
}