You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2015/03/30 14:32:35 UTC

[3/4] camel git commit: Fix Camel-8540 S3Consumer uses maxMessagesPerPoll incorrectly

Fix Camel-8540 S3Consumer uses maxMessagesPerPoll incorrectly


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/dbf52d54
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/dbf52d54
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/dbf52d54

Branch: refs/heads/camel-2.15.x
Commit: dbf52d546f32d1506e95c20709e287b3c5c4f59b
Parents: f92078e
Author: ancosen <an...@gmail.com>
Authored: Sun Mar 29 14:24:57 2015 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Mon Mar 30 14:34:45 2015 +0200

----------------------------------------------------------------------
 .../camel/component/aws/s3/S3Consumer.java      |   4 +-
 .../component/aws/s3/AmazonS3ClientMock.java    |  12 +-
 .../S3BatchConsumerMaxMessagesPerPollTest.java  | 110 +++++++++++++++++++
 3 files changed, 123 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/dbf52d54/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java
index 73656e4..bac2649 100644
--- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java
@@ -75,7 +75,9 @@ public class S3Consumer extends ScheduledBatchPollingConsumer {
             ListObjectsRequest listObjectsRequest = new ListObjectsRequest();
             listObjectsRequest.setBucketName(bucketName);
             listObjectsRequest.setPrefix(getConfiguration().getPrefix());
-            listObjectsRequest.setMaxKeys(maxMessagesPerPoll);
+            if (maxMessagesPerPoll > 0) {
+                listObjectsRequest.setMaxKeys(maxMessagesPerPoll);
+            }
             if (marker != null && !getConfiguration().isDeleteAfterRead()) {
                 listObjectsRequest.setMarker(marker);
             }

http://git-wip-us.apache.org/repos/asf/camel/blob/dbf52d54/components/camel-aws/src/test/java/org/apache/camel/component/aws/s3/AmazonS3ClientMock.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/test/java/org/apache/camel/component/aws/s3/AmazonS3ClientMock.java b/components/camel-aws/src/test/java/org/apache/camel/component/aws/s3/AmazonS3ClientMock.java
index de839b1..93f2186 100644
--- a/components/camel-aws/src/test/java/org/apache/camel/component/aws/s3/AmazonS3ClientMock.java
+++ b/components/camel-aws/src/test/java/org/apache/camel/component/aws/s3/AmazonS3ClientMock.java
@@ -75,6 +75,8 @@ import com.amazonaws.services.s3.model.StorageClass;
 import com.amazonaws.services.s3.model.UploadPartRequest;
 import com.amazonaws.services.s3.model.UploadPartResult;
 import com.amazonaws.services.s3.model.VersionListing;
+
+import org.apache.camel.util.ObjectHelper;
 import org.junit.Assert;
 
 public class AmazonS3ClientMock extends AmazonS3Client {
@@ -84,6 +86,8 @@ public class AmazonS3ClientMock extends AmazonS3Client {
     
     private boolean nonExistingBucketCreated;
     
+    private int maxCapacity = 100;
+    
     public AmazonS3ClientMock() {
         super(new BasicAWSCredentials("myAccessKey", "mySecretKey"));
     }
@@ -126,9 +130,13 @@ public class AmazonS3ClientMock extends AmazonS3Client {
             ex.setStatusCode(404);
             throw ex; 
         }
-        
+        int capacity;
         ObjectListing objectListing = new ObjectListing();
-        int capacity = listObjectsRequest.getMaxKeys();
+        if (!ObjectHelper.isEmpty(listObjectsRequest.getMaxKeys()) && listObjectsRequest.getMaxKeys() != null) {
+            capacity = listObjectsRequest.getMaxKeys();
+        } else {
+        	capacity = maxCapacity;
+        }
         
         for (int index = 0; index < objects.size() && index < capacity; index++) {
             S3ObjectSummary s3ObjectSummary = new S3ObjectSummary();

http://git-wip-us.apache.org/repos/asf/camel/blob/dbf52d54/components/camel-aws/src/test/java/org/apache/camel/component/aws/s3/S3BatchConsumerMaxMessagesPerPollTest.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/test/java/org/apache/camel/component/aws/s3/S3BatchConsumerMaxMessagesPerPollTest.java b/components/camel-aws/src/test/java/org/apache/camel/component/aws/s3/S3BatchConsumerMaxMessagesPerPollTest.java
new file mode 100644
index 0000000..c1b9fe2
--- /dev/null
+++ b/components/camel-aws/src/test/java/org/apache/camel/component/aws/s3/S3BatchConsumerMaxMessagesPerPollTest.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws.s3;
+
+import com.amazonaws.services.s3.model.S3Object;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.impl.JndiRegistry;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+public class S3BatchConsumerMaxMessagesPerPollTest extends CamelTestSupport {
+    
+    @EndpointInject(uri = "mock:result")
+    private MockEndpoint mock;
+        
+    @Test
+    public void receiveBatch() throws Exception {
+        mock.expectedMessageCount(20);
+        assertMockEndpointsSatisfied();
+        
+        mock.message(0).exchangeProperty(Exchange.BATCH_INDEX).isEqualTo(1);
+        mock.message(1).exchangeProperty(Exchange.BATCH_INDEX).isEqualTo(2);
+        mock.message(2).exchangeProperty(Exchange.BATCH_INDEX).isEqualTo(3);
+        mock.message(3).exchangeProperty(Exchange.BATCH_INDEX).isEqualTo(4);
+        mock.message(4).exchangeProperty(Exchange.BATCH_INDEX).isEqualTo(5);
+        mock.message(5).exchangeProperty(Exchange.BATCH_INDEX).isEqualTo(6);
+        mock.message(6).exchangeProperty(Exchange.BATCH_INDEX).isEqualTo(7);
+        mock.message(7).exchangeProperty(Exchange.BATCH_INDEX).isEqualTo(8);
+        mock.message(8).exchangeProperty(Exchange.BATCH_INDEX).isEqualTo(9);
+        mock.message(9).exchangeProperty(Exchange.BATCH_INDEX).isEqualTo(10);
+        mock.message(10).exchangeProperty(Exchange.BATCH_INDEX).isEqualTo(11);
+        mock.message(11).exchangeProperty(Exchange.BATCH_INDEX).isEqualTo(12);
+        mock.message(12).exchangeProperty(Exchange.BATCH_INDEX).isEqualTo(13);
+        mock.message(13).exchangeProperty(Exchange.BATCH_INDEX).isEqualTo(14);
+        mock.message(14).exchangeProperty(Exchange.BATCH_INDEX).isEqualTo(15);
+        mock.message(15).exchangeProperty(Exchange.BATCH_INDEX).isEqualTo(16);
+        mock.message(16).exchangeProperty(Exchange.BATCH_INDEX).isEqualTo(17);
+        mock.message(17).exchangeProperty(Exchange.BATCH_INDEX).isEqualTo(18);
+        mock.message(18).exchangeProperty(Exchange.BATCH_INDEX).isEqualTo(19);
+        mock.message(19).exchangeProperty(Exchange.BATCH_INDEX).isEqualTo(20);
+        mock.message(0).exchangeProperty(Exchange.BATCH_COMPLETE).isEqualTo(false);
+        mock.message(1).exchangeProperty(Exchange.BATCH_COMPLETE).isEqualTo(false);
+        mock.message(2).exchangeProperty(Exchange.BATCH_COMPLETE).isEqualTo(false);
+        mock.message(3).exchangeProperty(Exchange.BATCH_COMPLETE).isEqualTo(false);
+        mock.message(4).exchangeProperty(Exchange.BATCH_COMPLETE).isEqualTo(false);
+        mock.message(5).exchangeProperty(Exchange.BATCH_COMPLETE).isEqualTo(false);
+        mock.message(6).exchangeProperty(Exchange.BATCH_COMPLETE).isEqualTo(false);
+        mock.message(7).exchangeProperty(Exchange.BATCH_COMPLETE).isEqualTo(false);
+        mock.message(8).exchangeProperty(Exchange.BATCH_COMPLETE).isEqualTo(false);
+        mock.message(9).exchangeProperty(Exchange.BATCH_COMPLETE).isEqualTo(false);
+        mock.message(10).exchangeProperty(Exchange.BATCH_COMPLETE).isEqualTo(false);
+        mock.message(11).exchangeProperty(Exchange.BATCH_COMPLETE).isEqualTo(false);
+        mock.message(12).exchangeProperty(Exchange.BATCH_COMPLETE).isEqualTo(false);
+        mock.message(13).exchangeProperty(Exchange.BATCH_COMPLETE).isEqualTo(false);
+        mock.message(14).exchangeProperty(Exchange.BATCH_COMPLETE).isEqualTo(false);
+        mock.message(15).exchangeProperty(Exchange.BATCH_COMPLETE).isEqualTo(false);
+        mock.message(16).exchangeProperty(Exchange.BATCH_COMPLETE).isEqualTo(false);
+        mock.message(17).exchangeProperty(Exchange.BATCH_COMPLETE).isEqualTo(false);
+        mock.message(18).exchangeProperty(Exchange.BATCH_COMPLETE).isEqualTo(false);
+        mock.message(19).exchangeProperty(Exchange.BATCH_COMPLETE).isEqualTo(true);
+        mock.expectedPropertyReceived(Exchange.BATCH_SIZE, 20);
+    }
+    
+    @Override
+    protected JndiRegistry createRegistry() throws Exception {
+        JndiRegistry registry = super.createRegistry();
+        
+        AmazonS3ClientMock clientMock = new AmazonS3ClientMock();
+        // add 20 messages
+        for (int counter = 0; counter < 20; counter++) {
+            S3Object s3Object = new S3Object();
+            s3Object.setBucketName("mycamelbucket");
+            s3Object.setKey("counter-" + counter);
+            
+            clientMock.objects.add(s3Object);
+        }
+        
+        registry.bind("amazonS3Client", clientMock);
+        
+        return registry;
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("aws-s3://mycamelbucket?amazonS3Client=#amazonS3Client&region=us-west-1&delay=5000&maxMessagesPerPoll=0")
+                    .to("mock:result");
+            }
+        };
+    }
+}
\ No newline at end of file