You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2013/04/16 13:33:35 UTC
svn commit: r1468376 - in
/camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws:
s3/S3Consumer.java sqs/SqsConsumer.java
Author: davsclaus
Date: Tue Apr 16 11:33:34 2013
New Revision: 1468376
URL: http://svn.apache.org/r1468376
Log:
CAMEL-6286: Leverage async routing engine for S3 and SQS consumers. Thanks to Alex Hutter for the patch.
Modified:
camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java
camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java
Modified: camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java?rev=1468376&r1=1468375&r2=1468376&view=diff
==============================================================================
--- camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java (original)
+++ camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java Tue Apr 16 11:33:34 2013
@@ -27,6 +27,7 @@ import com.amazonaws.services.s3.model.O
import com.amazonaws.services.s3.model.S3Object;
import com.amazonaws.services.s3.model.S3ObjectSummary;
+import org.apache.camel.AsyncCallback;
import org.apache.camel.Exchange;
import org.apache.camel.NoFactoryAvailableException;
import org.apache.camel.Processor;
@@ -58,7 +59,7 @@ public class S3Consumer extends Schedule
pendingExchanges = 0;
String bucketName = getConfiguration().getBucketName();
- LOG.trace("Quering objects in bucket [{}]...", bucketName);
+ LOG.trace("Queueing objects in bucket [{}]...", bucketName);
ListObjectsRequest listObjectsRequest = new ListObjectsRequest();
listObjectsRequest.setBucketName(bucketName);
@@ -66,15 +67,19 @@ public class S3Consumer extends Schedule
listObjectsRequest.setMaxKeys(maxMessagesPerPoll);
ObjectListing listObjects = getAmazonS3Client().listObjects(listObjectsRequest);
-
- LOG.trace("Found {} objects in bucket [{}]...", listObjects.getObjectSummaries().size(), bucketName);
+
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Found {} objects in bucket [{}]...", listObjects.getObjectSummaries().size(), bucketName);
+ }
Queue<Exchange> exchanges = createExchanges(listObjects.getObjectSummaries());
return processBatch(CastUtils.cast(exchanges));
}
protected Queue<Exchange> createExchanges(List<S3ObjectSummary> s3ObjectSummaries) {
- LOG.trace("Received {} messages in this poll", s3ObjectSummaries.size());
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Received {} messages in this poll", s3ObjectSummaries.size());
+ }
Queue<Exchange> answer = new LinkedList<Exchange>();
for (S3ObjectSummary s3ObjectSummary : s3ObjectSummaries) {
@@ -91,7 +96,7 @@ public class S3Consumer extends Schedule
for (int index = 0; index < total && isBatchAllowed(); index++) {
// only loop if we are started (allowed to run)
- Exchange exchange = ObjectHelper.cast(Exchange.class, exchanges.poll());
+ final Exchange exchange = ObjectHelper.cast(Exchange.class, exchanges.poll());
// add current index and total as properties
exchange.setProperty(Exchange.BATCH_INDEX, index);
exchange.setProperty(Exchange.BATCH_SIZE, total);
@@ -117,8 +122,12 @@ public class S3Consumer extends Schedule
});
LOG.trace("Processing exchange [{}]...", exchange);
-
- getProcessor().process(exchange);
+ getAsyncProcessor().process(exchange, new AsyncCallback() {
+ @Override
+ public void done(boolean doneSync) {
+ LOG.trace("Processing exchange [{}] done.", exchange);
+ }
+ });
}
return total;
@@ -138,12 +147,11 @@ public class S3Consumer extends Schedule
LOG.trace("Deleting object from bucket {} with key {}...", bucketName, key);
getAmazonS3Client().deleteObject(bucketName, key);
-
- LOG.trace("Object deleted");
+
+ LOG.trace("Deleted object from bucket {} with key {}...", bucketName, key);
}
} catch (AmazonClientException e) {
- LOG.warn("Error occurred during deleting object", e);
- exchange.setException(e);
+ getExceptionHandler().handleException("Error occurred during deleting object. This exception is ignored.", exchange, e);
}
}
Modified: camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java?rev=1468376&r1=1468375&r2=1468376&view=diff
==============================================================================
--- camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java (original)
+++ camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java Tue Apr 16 11:33:34 2013
@@ -33,6 +33,7 @@ import com.amazonaws.services.sqs.model.
import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
import com.amazonaws.services.sqs.model.ReceiveMessageResult;
+import org.apache.camel.AsyncCallback;
import org.apache.camel.Exchange;
import org.apache.camel.NoFactoryAvailableException;
import org.apache.camel.Processor;
@@ -74,15 +75,19 @@ public class SqsConsumer extends Schedul
LOG.trace("Receiving messages with request [{}]...", request);
ReceiveMessageResult messageResult = getClient().receiveMessage(request);
-
- LOG.trace("Received {} messages", messageResult.getMessages().size());
+
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Received {} messages", messageResult.getMessages().size());
+ }
Queue<Exchange> exchanges = createExchanges(messageResult.getMessages());
return processBatch(CastUtils.cast(exchanges));
}
protected Queue<Exchange> createExchanges(List<Message> messages) {
- LOG.trace("Received {} messages in this poll", messages.size());
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Received {} messages in this poll", messages.size());
+ }
Queue<Exchange> answer = new LinkedList<Exchange>();
for (Message message : messages) {
@@ -98,7 +103,7 @@ public class SqsConsumer extends Schedul
for (int index = 0; index < total && isBatchAllowed(); index++) {
// only loop if we are started (allowed to run)
- Exchange exchange = ObjectHelper.cast(Exchange.class, exchanges.poll());
+ final Exchange exchange = ObjectHelper.cast(Exchange.class, exchanges.poll());
// add current index and total as properties
exchange.setProperty(Exchange.BATCH_INDEX, index);
exchange.setProperty(Exchange.BATCH_SIZE, total);
@@ -112,11 +117,11 @@ public class SqsConsumer extends Schedul
if (this.scheduledExecutor != null && visibilityTimeout != null && (visibilityTimeout.intValue() / 2) > 0) {
int delay = visibilityTimeout.intValue() / 2;
int period = visibilityTimeout.intValue();
- LOG.debug("Scheduled TimeoutExtender task to start after {} delay, and run with {} period (seconds) to extend exchangeId: {}",
- new Object[]{delay, period, exchange.getExchangeId()});
- int repeatSeconds = new Double(visibilityTimeout.doubleValue() * 1.5).intValue(); //
- LOG.debug("period :" + period);
- LOG.debug("repeatSeconds :" + repeatSeconds);
+ int repeatSeconds = new Double(visibilityTimeout.doubleValue() * 1.5).intValue();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Scheduled TimeoutExtender task to start after {} delay, and run with {}/{} period/repeat (seconds), to extend exchangeId: {}",
+ new Object[]{delay, period, repeatSeconds, exchange.getExchangeId()});
+ }
final ScheduledFuture<?> scheduledFuture = this.scheduledExecutor.scheduleAtFixedRate(
new TimeoutExtender(exchange, repeatSeconds), delay, period, TimeUnit.SECONDS);
exchange.addOnCompletion(new Synchronization() {
@@ -157,12 +162,12 @@ public class SqsConsumer extends Schedul
LOG.trace("Processing exchange [{}]...", exchange);
- try {
- // This blocks while message is consumed.
- getProcessor().process(exchange);
- } finally {
- LOG.trace("Processing exchange [{}] done.", exchange);
- }
+ getAsyncProcessor().process(exchange, new AsyncCallback() {
+ @Override
+ public void done(boolean doneSync) {
+ LOG.trace("Processing exchange [{}] done.", exchange);
+ }
+ });
}
return total;
@@ -183,10 +188,10 @@ public class SqsConsumer extends Schedul
getClient().deleteMessage(deleteRequest);
- LOG.trace("Message deleted");
+ LOG.trace("Deleted message with receipt handle {}...", receiptHandle);
}
} catch (AmazonClientException e) {
- getExceptionHandler().handleException("Error occurred during deleting message.", e);
+ getExceptionHandler().handleException("Error occurred during deleting message. This exception is ignored.", exchange, e);
}
}