You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2013/04/16 13:33:35 UTC

svn commit: r1468376 - in /camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws: s3/S3Consumer.java sqs/SqsConsumer.java

Author: davsclaus
Date: Tue Apr 16 11:33:34 2013
New Revision: 1468376

URL: http://svn.apache.org/r1468376
Log:
CAMEL-6286: Leverage async routing engine for S3 and SQS consumers. Thanks to Alex Hutter for the patch.

Modified:
    camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java
    camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java

Modified: camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java?rev=1468376&r1=1468375&r2=1468376&view=diff
==============================================================================
--- camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java (original)
+++ camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java Tue Apr 16 11:33:34 2013
@@ -27,6 +27,7 @@ import com.amazonaws.services.s3.model.O
 import com.amazonaws.services.s3.model.S3Object;
 import com.amazonaws.services.s3.model.S3ObjectSummary;
 
+import org.apache.camel.AsyncCallback;
 import org.apache.camel.Exchange;
 import org.apache.camel.NoFactoryAvailableException;
 import org.apache.camel.Processor;
@@ -58,7 +59,7 @@ public class S3Consumer extends Schedule
         pendingExchanges = 0;
         
         String bucketName = getConfiguration().getBucketName();
-        LOG.trace("Quering objects in bucket [{}]...", bucketName);
+        LOG.trace("Queueing objects in bucket [{}]...", bucketName);
         
         ListObjectsRequest listObjectsRequest = new ListObjectsRequest();
         listObjectsRequest.setBucketName(bucketName);
@@ -66,15 +67,19 @@ public class S3Consumer extends Schedule
         listObjectsRequest.setMaxKeys(maxMessagesPerPoll);
         
         ObjectListing listObjects = getAmazonS3Client().listObjects(listObjectsRequest);
-        
-        LOG.trace("Found {} objects in bucket [{}]...", listObjects.getObjectSummaries().size(), bucketName);
+
+        if (LOG.isTraceEnabled()) {
+            LOG.trace("Found {} objects in bucket [{}]...", listObjects.getObjectSummaries().size(), bucketName);
+        }
         
         Queue<Exchange> exchanges = createExchanges(listObjects.getObjectSummaries());
         return processBatch(CastUtils.cast(exchanges));
     }
     
     protected Queue<Exchange> createExchanges(List<S3ObjectSummary> s3ObjectSummaries) {
-        LOG.trace("Received {} messages in this poll", s3ObjectSummaries.size());
+        if (LOG.isTraceEnabled()) {
+            LOG.trace("Received {} messages in this poll", s3ObjectSummaries.size());
+        }
         
         Queue<Exchange> answer = new LinkedList<Exchange>();
         for (S3ObjectSummary s3ObjectSummary : s3ObjectSummaries) {
@@ -91,7 +96,7 @@ public class S3Consumer extends Schedule
 
         for (int index = 0; index < total && isBatchAllowed(); index++) {
             // only loop if we are started (allowed to run)
-            Exchange exchange = ObjectHelper.cast(Exchange.class, exchanges.poll());
+            final Exchange exchange = ObjectHelper.cast(Exchange.class, exchanges.poll());
             // add current index and total as properties
             exchange.setProperty(Exchange.BATCH_INDEX, index);
             exchange.setProperty(Exchange.BATCH_SIZE, total);
@@ -117,8 +122,12 @@ public class S3Consumer extends Schedule
             });
 
             LOG.trace("Processing exchange [{}]...", exchange);
-
-            getProcessor().process(exchange);
+            getAsyncProcessor().process(exchange, new AsyncCallback() {
+                @Override
+                public void done(boolean doneSync) {
+                    LOG.trace("Processing exchange [{}] done.", exchange);
+                }
+            });
         }
 
         return total;
@@ -138,12 +147,11 @@ public class S3Consumer extends Schedule
                 LOG.trace("Deleting object from bucket {} with key {}...", bucketName, key);
                 
                 getAmazonS3Client().deleteObject(bucketName, key);
-                
-                LOG.trace("Object deleted");
+
+                LOG.trace("Deleted object from bucket {} with key {}...", bucketName, key);
             }
         } catch (AmazonClientException e) {
-            LOG.warn("Error occurred during deleting object", e);
-            exchange.setException(e);
+            getExceptionHandler().handleException("Error occurred during deleting object. This exception is ignored.", exchange, e);
         }
     }
 

Modified: camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java?rev=1468376&r1=1468375&r2=1468376&view=diff
==============================================================================
--- camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java (original)
+++ camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java Tue Apr 16 11:33:34 2013
@@ -33,6 +33,7 @@ import com.amazonaws.services.sqs.model.
 import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
 import com.amazonaws.services.sqs.model.ReceiveMessageResult;
 
+import org.apache.camel.AsyncCallback;
 import org.apache.camel.Exchange;
 import org.apache.camel.NoFactoryAvailableException;
 import org.apache.camel.Processor;
@@ -74,15 +75,19 @@ public class SqsConsumer extends Schedul
         LOG.trace("Receiving messages with request [{}]...", request);
         
         ReceiveMessageResult messageResult = getClient().receiveMessage(request);
-        
-        LOG.trace("Received {} messages", messageResult.getMessages().size());
+
+        if (LOG.isTraceEnabled()) {
+            LOG.trace("Received {} messages", messageResult.getMessages().size());
+        }
         
         Queue<Exchange> exchanges = createExchanges(messageResult.getMessages());
         return processBatch(CastUtils.cast(exchanges));
     }
     
     protected Queue<Exchange> createExchanges(List<Message> messages) {
-        LOG.trace("Received {} messages in this poll", messages.size());
+        if (LOG.isTraceEnabled()) {
+            LOG.trace("Received {} messages in this poll", messages.size());
+        }
         
         Queue<Exchange> answer = new LinkedList<Exchange>();
         for (Message message : messages) {
@@ -98,7 +103,7 @@ public class SqsConsumer extends Schedul
 
         for (int index = 0; index < total && isBatchAllowed(); index++) {
             // only loop if we are started (allowed to run)
-            Exchange exchange = ObjectHelper.cast(Exchange.class, exchanges.poll());
+            final Exchange exchange = ObjectHelper.cast(Exchange.class, exchanges.poll());
             // add current index and total as properties
             exchange.setProperty(Exchange.BATCH_INDEX, index);
             exchange.setProperty(Exchange.BATCH_SIZE, total);
@@ -112,11 +117,11 @@ public class SqsConsumer extends Schedul
             if (this.scheduledExecutor != null && visibilityTimeout != null && (visibilityTimeout.intValue() / 2) > 0) {
                 int delay = visibilityTimeout.intValue() / 2;
                 int period = visibilityTimeout.intValue();
-                LOG.debug("Scheduled TimeoutExtender task to start after {} delay, and run with {} period (seconds) to extend exchangeId: {}",
-                        new Object[]{delay, period, exchange.getExchangeId()});
-                int repeatSeconds = new Double(visibilityTimeout.doubleValue() * 1.5).intValue();   //
-                LOG.debug("period :" + period);
-                LOG.debug("repeatSeconds :" + repeatSeconds);
+                int repeatSeconds = new Double(visibilityTimeout.doubleValue() * 1.5).intValue();
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Scheduled TimeoutExtender task to start after {} delay, and run with {}/{} period/repeat (seconds), to extend exchangeId: {}",
+                            new Object[]{delay, period, repeatSeconds, exchange.getExchangeId()});
+                }
                 final ScheduledFuture<?> scheduledFuture = this.scheduledExecutor.scheduleAtFixedRate(
                         new TimeoutExtender(exchange, repeatSeconds), delay, period, TimeUnit.SECONDS);
                 exchange.addOnCompletion(new Synchronization() {
@@ -157,12 +162,12 @@ public class SqsConsumer extends Schedul
 
 
             LOG.trace("Processing exchange [{}]...", exchange);
-            try {
-                // This blocks while message is consumed.
-                getProcessor().process(exchange);
-            } finally {
-                LOG.trace("Processing exchange [{}] done.", exchange);
-            }
+            getAsyncProcessor().process(exchange, new AsyncCallback() {
+                @Override
+                public void done(boolean doneSync) {
+                    LOG.trace("Processing exchange [{}] done.", exchange);
+                }
+            });
         }
 
         return total;
@@ -183,10 +188,10 @@ public class SqsConsumer extends Schedul
                 
                 getClient().deleteMessage(deleteRequest);
 
-                LOG.trace("Message deleted");
+                LOG.trace("Deleted message with receipt handle {}...", receiptHandle);
             }
         } catch (AmazonClientException e) {
-            getExceptionHandler().handleException("Error occurred during deleting message.", e);
+            getExceptionHandler().handleException("Error occurred during deleting message. This exception is ignored.", exchange, e);
         }
     }