You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2015/03/30 14:32:35 UTC
[3/4] camel git commit: Fix Camel-8540 S3Consumer uses
maxMessagesPerPoll incorrectly
Fix Camel-8540 S3Consumer uses maxMessagesPerPoll incorrectly
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/dbf52d54
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/dbf52d54
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/dbf52d54
Branch: refs/heads/camel-2.15.x
Commit: dbf52d546f32d1506e95c20709e287b3c5c4f59b
Parents: f92078e
Author: ancosen <an...@gmail.com>
Authored: Sun Mar 29 14:24:57 2015 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Mon Mar 30 14:34:45 2015 +0200
----------------------------------------------------------------------
.../camel/component/aws/s3/S3Consumer.java | 4 +-
.../component/aws/s3/AmazonS3ClientMock.java | 12 +-
.../S3BatchConsumerMaxMessagesPerPollTest.java | 110 +++++++++++++++++++
3 files changed, 123 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/dbf52d54/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java
index 73656e4..bac2649 100644
--- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java
@@ -75,7 +75,9 @@ public class S3Consumer extends ScheduledBatchPollingConsumer {
ListObjectsRequest listObjectsRequest = new ListObjectsRequest();
listObjectsRequest.setBucketName(bucketName);
listObjectsRequest.setPrefix(getConfiguration().getPrefix());
- listObjectsRequest.setMaxKeys(maxMessagesPerPoll);
+ if (maxMessagesPerPoll > 0) {
+ listObjectsRequest.setMaxKeys(maxMessagesPerPoll);
+ }
if (marker != null && !getConfiguration().isDeleteAfterRead()) {
listObjectsRequest.setMarker(marker);
}
http://git-wip-us.apache.org/repos/asf/camel/blob/dbf52d54/components/camel-aws/src/test/java/org/apache/camel/component/aws/s3/AmazonS3ClientMock.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/test/java/org/apache/camel/component/aws/s3/AmazonS3ClientMock.java b/components/camel-aws/src/test/java/org/apache/camel/component/aws/s3/AmazonS3ClientMock.java
index de839b1..93f2186 100644
--- a/components/camel-aws/src/test/java/org/apache/camel/component/aws/s3/AmazonS3ClientMock.java
+++ b/components/camel-aws/src/test/java/org/apache/camel/component/aws/s3/AmazonS3ClientMock.java
@@ -75,6 +75,8 @@ import com.amazonaws.services.s3.model.StorageClass;
import com.amazonaws.services.s3.model.UploadPartRequest;
import com.amazonaws.services.s3.model.UploadPartResult;
import com.amazonaws.services.s3.model.VersionListing;
+
+import org.apache.camel.util.ObjectHelper;
import org.junit.Assert;
public class AmazonS3ClientMock extends AmazonS3Client {
@@ -84,6 +86,8 @@ public class AmazonS3ClientMock extends AmazonS3Client {
private boolean nonExistingBucketCreated;
+ private int maxCapacity = 100;
+
public AmazonS3ClientMock() {
super(new BasicAWSCredentials("myAccessKey", "mySecretKey"));
}
@@ -126,9 +130,13 @@ public class AmazonS3ClientMock extends AmazonS3Client {
ex.setStatusCode(404);
throw ex;
}
-
+ int capacity;
ObjectListing objectListing = new ObjectListing();
- int capacity = listObjectsRequest.getMaxKeys();
+ if (!ObjectHelper.isEmpty(listObjectsRequest.getMaxKeys()) && listObjectsRequest.getMaxKeys() != null) {
+ capacity = listObjectsRequest.getMaxKeys();
+ } else {
+ capacity = maxCapacity;
+ }
for (int index = 0; index < objects.size() && index < capacity; index++) {
S3ObjectSummary s3ObjectSummary = new S3ObjectSummary();
http://git-wip-us.apache.org/repos/asf/camel/blob/dbf52d54/components/camel-aws/src/test/java/org/apache/camel/component/aws/s3/S3BatchConsumerMaxMessagesPerPollTest.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/test/java/org/apache/camel/component/aws/s3/S3BatchConsumerMaxMessagesPerPollTest.java b/components/camel-aws/src/test/java/org/apache/camel/component/aws/s3/S3BatchConsumerMaxMessagesPerPollTest.java
new file mode 100644
index 0000000..c1b9fe2
--- /dev/null
+++ b/components/camel-aws/src/test/java/org/apache/camel/component/aws/s3/S3BatchConsumerMaxMessagesPerPollTest.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws.s3;
+
+import com.amazonaws.services.s3.model.S3Object;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.impl.JndiRegistry;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+public class S3BatchConsumerMaxMessagesPerPollTest extends CamelTestSupport {
+
+ @EndpointInject(uri = "mock:result")
+ private MockEndpoint mock;
+
+ @Test
+ public void receiveBatch() throws Exception {
+ mock.expectedMessageCount(20);
+ assertMockEndpointsSatisfied();
+
+ mock.message(0).exchangeProperty(Exchange.BATCH_INDEX).isEqualTo(1);
+ mock.message(1).exchangeProperty(Exchange.BATCH_INDEX).isEqualTo(2);
+ mock.message(2).exchangeProperty(Exchange.BATCH_INDEX).isEqualTo(3);
+ mock.message(3).exchangeProperty(Exchange.BATCH_INDEX).isEqualTo(4);
+ mock.message(4).exchangeProperty(Exchange.BATCH_INDEX).isEqualTo(5);
+ mock.message(5).exchangeProperty(Exchange.BATCH_INDEX).isEqualTo(6);
+ mock.message(6).exchangeProperty(Exchange.BATCH_INDEX).isEqualTo(7);
+ mock.message(7).exchangeProperty(Exchange.BATCH_INDEX).isEqualTo(8);
+ mock.message(8).exchangeProperty(Exchange.BATCH_INDEX).isEqualTo(9);
+ mock.message(9).exchangeProperty(Exchange.BATCH_INDEX).isEqualTo(10);
+ mock.message(10).exchangeProperty(Exchange.BATCH_INDEX).isEqualTo(11);
+ mock.message(11).exchangeProperty(Exchange.BATCH_INDEX).isEqualTo(12);
+ mock.message(12).exchangeProperty(Exchange.BATCH_INDEX).isEqualTo(13);
+ mock.message(13).exchangeProperty(Exchange.BATCH_INDEX).isEqualTo(14);
+ mock.message(14).exchangeProperty(Exchange.BATCH_INDEX).isEqualTo(15);
+ mock.message(15).exchangeProperty(Exchange.BATCH_INDEX).isEqualTo(16);
+ mock.message(16).exchangeProperty(Exchange.BATCH_INDEX).isEqualTo(17);
+ mock.message(17).exchangeProperty(Exchange.BATCH_INDEX).isEqualTo(18);
+ mock.message(18).exchangeProperty(Exchange.BATCH_INDEX).isEqualTo(19);
+ mock.message(19).exchangeProperty(Exchange.BATCH_INDEX).isEqualTo(20);
+ mock.message(0).exchangeProperty(Exchange.BATCH_COMPLETE).isEqualTo(false);
+ mock.message(1).exchangeProperty(Exchange.BATCH_COMPLETE).isEqualTo(false);
+ mock.message(2).exchangeProperty(Exchange.BATCH_COMPLETE).isEqualTo(false);
+ mock.message(3).exchangeProperty(Exchange.BATCH_COMPLETE).isEqualTo(false);
+ mock.message(4).exchangeProperty(Exchange.BATCH_COMPLETE).isEqualTo(false);
+ mock.message(5).exchangeProperty(Exchange.BATCH_COMPLETE).isEqualTo(false);
+ mock.message(6).exchangeProperty(Exchange.BATCH_COMPLETE).isEqualTo(false);
+ mock.message(7).exchangeProperty(Exchange.BATCH_COMPLETE).isEqualTo(false);
+ mock.message(8).exchangeProperty(Exchange.BATCH_COMPLETE).isEqualTo(false);
+ mock.message(9).exchangeProperty(Exchange.BATCH_COMPLETE).isEqualTo(false);
+ mock.message(10).exchangeProperty(Exchange.BATCH_COMPLETE).isEqualTo(false);
+ mock.message(11).exchangeProperty(Exchange.BATCH_COMPLETE).isEqualTo(false);
+ mock.message(12).exchangeProperty(Exchange.BATCH_COMPLETE).isEqualTo(false);
+ mock.message(13).exchangeProperty(Exchange.BATCH_COMPLETE).isEqualTo(false);
+ mock.message(14).exchangeProperty(Exchange.BATCH_COMPLETE).isEqualTo(false);
+ mock.message(15).exchangeProperty(Exchange.BATCH_COMPLETE).isEqualTo(false);
+ mock.message(16).exchangeProperty(Exchange.BATCH_COMPLETE).isEqualTo(false);
+ mock.message(17).exchangeProperty(Exchange.BATCH_COMPLETE).isEqualTo(false);
+ mock.message(18).exchangeProperty(Exchange.BATCH_COMPLETE).isEqualTo(false);
+ mock.message(19).exchangeProperty(Exchange.BATCH_COMPLETE).isEqualTo(true);
+ mock.expectedPropertyReceived(Exchange.BATCH_SIZE, 20);
+ }
+
+ @Override
+ protected JndiRegistry createRegistry() throws Exception {
+ JndiRegistry registry = super.createRegistry();
+
+ AmazonS3ClientMock clientMock = new AmazonS3ClientMock();
+ // add 20 messages
+ for (int counter = 0; counter < 20; counter++) {
+ S3Object s3Object = new S3Object();
+ s3Object.setBucketName("mycamelbucket");
+ s3Object.setKey("counter-" + counter);
+
+ clientMock.objects.add(s3Object);
+ }
+
+ registry.bind("amazonS3Client", clientMock);
+
+ return registry;
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("aws-s3://mycamelbucket?amazonS3Client=#amazonS3Client®ion=us-west-1&delay=5000&maxMessagesPerPoll=0")
+ .to("mock:result");
+ }
+ };
+ }
+}
\ No newline at end of file