You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by cm...@apache.org on 2011/03/31 18:46:39 UTC
svn commit: r1087367 [1/2] - in /camel/trunk:
components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/
components/camel-aws/src/main/resources/META-INF/services/org/apache/camel/component/
components/camel-aws/src/test/java/org/apache/came...
Author: cmueller
Date: Thu Mar 31 16:46:38 2011
New Revision: 1087367
URL: http://svn.apache.org/viewvc?rev=1087367&view=rev
Log:
CAMEL-3546: Add S3 support to camel-aws
Added:
camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/
camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Component.java
camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Configuration.java
camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Constants.java
camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java
camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Endpoint.java
camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Producer.java
camel/trunk/components/camel-aws/src/main/resources/META-INF/services/org/apache/camel/component/aws-s3
camel/trunk/components/camel-aws/src/test/java/org/apache/camel/component/aws/s3/
camel/trunk/components/camel-aws/src/test/java/org/apache/camel/component/aws/s3/AmazonS3ClientMock.java
camel/trunk/components/camel-aws/src/test/java/org/apache/camel/component/aws/s3/S3BatchConsumerTest.java
camel/trunk/components/camel-aws/src/test/java/org/apache/camel/component/aws/s3/S3ComponentConfigurationTest.java
camel/trunk/components/camel-aws/src/test/java/org/apache/camel/component/aws/s3/S3ComponentSpringTest.java
camel/trunk/components/camel-aws/src/test/java/org/apache/camel/component/aws/s3/S3ComponentTest.java
camel/trunk/components/camel-aws/src/test/java/org/apache/camel/component/aws/s3/integration/
camel/trunk/components/camel-aws/src/test/java/org/apache/camel/component/aws/s3/integration/S3ComponentIntegrationTest.java
camel/trunk/components/camel-aws/src/test/resources/org/apache/camel/component/aws/s3/
camel/trunk/components/camel-aws/src/test/resources/org/apache/camel/component/aws/s3/S3ComponentSpringTest-context.xml
camel/trunk/tests/camel-itest-osgi/src/test/java/org/apache/camel/itest/osgi/aws/AmazonS3ClientMock.java
camel/trunk/tests/camel-itest-osgi/src/test/java/org/apache/camel/itest/osgi/aws/AwsS3IntegrationTest.java
camel/trunk/tests/camel-itest-osgi/src/test/java/org/apache/camel/itest/osgi/aws/AwsS3Test.java
Added: camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Component.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Component.java?rev=1087367&view=auto
==============================================================================
--- camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Component.java (added)
+++ camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Component.java Thu Mar 31 16:46:38 2011
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws.s3;
+
+import java.util.Map;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.Endpoint;
+import org.apache.camel.impl.DefaultComponent;
+
+/**
+ * Defines the <a href="http://aws.amazon.com/s3/">AWS S3 Component</a>
+ */
+public class S3Component extends DefaultComponent {
+
+ public S3Component() {
+ super();
+ }
+
+ public S3Component(CamelContext context) {
+ super(context);
+ }
+
+ protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception {
+ S3Configuration configuration = new S3Configuration();
+ setProperties(configuration, parameters);
+
+ if (remaining == null || remaining.trim().length() == 0) {
+ throw new IllegalArgumentException("Bucket name must be specified.");
+ }
+ configuration.setBucketName(remaining);
+
+ if (configuration.getAmazonS3Client() == null && (configuration.getAccessKey() == null || configuration.getSecretKey() == null)) {
+ throw new IllegalArgumentException("AmazonS3Client or accessKey and secretKey must be specified");
+ }
+
+ S3Endpoint endpoint = new S3Endpoint(uri, getCamelContext(), configuration);
+ return endpoint;
+ }
+}
\ No newline at end of file
Added: camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Configuration.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Configuration.java?rev=1087367&view=auto
==============================================================================
--- camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Configuration.java (added)
+++ camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Configuration.java Thu Mar 31 16:46:38 2011
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws.s3;
+
+import com.amazonaws.services.s3.AmazonS3Client;
+
+/**
+ * The AWS S3 component configuration properties
+ *
+ */
+public class S3Configuration implements Cloneable {
+
+ private String accessKey;
+ private String secretKey;
+ private AmazonS3Client amazonS3Client;
+
+ private String bucketName;
+ private String region;
+ private boolean deleteAfterRead = true;
+
+ public String getAccessKey() {
+ return accessKey;
+ }
+
+ public void setAccessKey(String accessKey) {
+ this.accessKey = accessKey;
+ }
+
+ public String getSecretKey() {
+ return secretKey;
+ }
+
+ public void setSecretKey(String secretKey) {
+ this.secretKey = secretKey;
+ }
+
+ public AmazonS3Client getAmazonS3Client() {
+ return amazonS3Client;
+ }
+
+ public void setAmazonS3Client(AmazonS3Client amazonS3Client) {
+ this.amazonS3Client = amazonS3Client;
+ }
+
+ public String getBucketName() {
+ return bucketName;
+ }
+
+ public void setBucketName(String bucketName) {
+ this.bucketName = bucketName;
+ }
+
+ public String getRegion() {
+ return region;
+ }
+
+ public void setRegion(String region) {
+ this.region = region;
+ }
+
+ public boolean isDeleteAfterRead() {
+ return deleteAfterRead;
+ }
+
+ public void setDeleteAfterRead(boolean deleteAfterRead) {
+ this.deleteAfterRead = deleteAfterRead;
+ }
+}
\ No newline at end of file
Added: camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Constants.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Constants.java?rev=1087367&view=auto
==============================================================================
--- camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Constants.java (added)
+++ camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Constants.java Thu Mar 31 16:46:38 2011
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws.s3;
+
+/**
+ * Constants used in Camel AWS S3 module
+ *
+ */
+public interface S3Constants {
+
+ String BUCKET_NAME = "CamelAwsS3BucketName";
+ String CACHE_CONTROL = "CamelAwsS3ContentControl";
+ String CONTENT_DISPOSITION = "CamelAwsS3ContentDisposition";
+ String CONTENT_ENCODING = "CamelAwsS3ContentEncoding";
+ String CONTENT_LENGTH = "CamelAwsS3ContentLength";
+ String CONTENT_MD5 = "CamelAwsS3ContentMD5";
+ String CONTENT_TYPE = "CamelAwsS3ContentType";
+ String E_TAG = "CamelAwsS3ETag";
+ String KEY = "CamelAwsS3Key";
+ String LAST_MODIFIED = "CamelAwsS3LastModified";
+ String VERSION_ID = "CamelAwsS3VersionId";
+}
\ No newline at end of file
Added: camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java?rev=1087367&view=auto
==============================================================================
--- camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java (added)
+++ camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java Thu Mar 31 16:46:38 2011
@@ -0,0 +1,230 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws.s3;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Queue;
+
+import com.amazonaws.AmazonClientException;
+import com.amazonaws.services.s3.AmazonS3Client;
+import com.amazonaws.services.s3.model.ListObjectsRequest;
+import com.amazonaws.services.s3.model.ObjectListing;
+import com.amazonaws.services.s3.model.S3Object;
+import com.amazonaws.services.s3.model.S3ObjectSummary;
+
+import org.apache.camel.BatchConsumer;
+import org.apache.camel.Exchange;
+import org.apache.camel.NoFactoryAvailableException;
+import org.apache.camel.Processor;
+import org.apache.camel.ShutdownRunningTask;
+import org.apache.camel.impl.DefaultEndpoint;
+import org.apache.camel.impl.ScheduledPollConsumer;
+import org.apache.camel.spi.ShutdownAware;
+import org.apache.camel.spi.Synchronization;
+import org.apache.camel.util.CastUtils;
+import org.apache.camel.util.ObjectHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A Consumer of messages from the Amazon Web Service Simple Storage Service
+ * <a href="http://aws.amazon.com/s3/">AWS S3</a>
+ *
+ */
+public class S3Consumer extends ScheduledPollConsumer implements BatchConsumer, ShutdownAware {
+
+ private static final transient Logger LOG = LoggerFactory.getLogger(S3Consumer.class);
+
+ private volatile ShutdownRunningTask shutdownRunningTask;
+ private volatile int pendingExchanges;
+
+ public S3Consumer(S3Endpoint endpoint, Processor processor) throws NoFactoryAvailableException {
+ super(endpoint, processor);
+ }
+
+ @Override
+ protected int poll() throws Exception {
+ // must reset for each poll
+ shutdownRunningTask = null;
+ pendingExchanges = 0;
+
+ String bucketName = getConfiguration().getBucketName();
+ LOG.trace("Quering objects in bucket [{}]...", bucketName);
+
+ ListObjectsRequest listObjectsRequest = new ListObjectsRequest();
+ listObjectsRequest.setBucketName(bucketName);
+ listObjectsRequest.setMaxKeys(getMaxMessagesPerPoll());
+
+ ObjectListing listObjects = getAmazonS3Client().listObjects(listObjectsRequest);
+
+ LOG.trace("Found {} objects in bucket [{}]...", listObjects.getObjectSummaries().size(), bucketName);
+
+ Queue<Exchange> exchanges = createExchanges(listObjects.getObjectSummaries());
+ return processBatch(CastUtils.cast(exchanges));
+ }
+
+ protected Queue<Exchange> createExchanges(List<S3ObjectSummary> s3ObjectSummaries) {
+ LOG.trace("Received {} messages in this poll", s3ObjectSummaries.size());
+
+ Queue<Exchange> answer = new LinkedList<Exchange>();
+ for (S3ObjectSummary s3ObjectSummary : s3ObjectSummaries) {
+ S3Object s3Object = getAmazonS3Client().getObject(s3ObjectSummary.getBucketName(), s3ObjectSummary.getKey());
+ Exchange exchange = getEndpoint().createExchange(s3Object);
+ answer.add(exchange);
+ }
+
+ return answer;
+ }
+
+ public int processBatch(Queue<Object> exchanges) throws Exception {
+ int total = exchanges.size();
+
+ for (int index = 0; index < total && isBatchAllowed(); index++) {
+ // only loop if we are started (allowed to run)
+ Exchange exchange = ObjectHelper.cast(Exchange.class, exchanges.poll());
+ // add current index and total as properties
+ exchange.setProperty(Exchange.BATCH_INDEX, index);
+ exchange.setProperty(Exchange.BATCH_SIZE, total);
+ exchange.setProperty(Exchange.BATCH_COMPLETE, index == total - 1);
+
+ // update pending number of exchanges
+ pendingExchanges = total - index - 1;
+
+ // add on completion to handle after work when the exchange is done
+ exchange.addOnCompletion(new Synchronization() {
+ public void onComplete(Exchange exchange) {
+ processCommit(exchange);
+ }
+
+ public void onFailure(Exchange exchange) {
+ processRollback(exchange);
+ }
+
+ @Override
+ public String toString() {
+ return "S3ConsumerOnCompletion";
+ }
+ });
+
+ LOG.trace("Processing exchange [{}]...", exchange);
+
+ getProcessor().process(exchange);
+ }
+
+ return total;
+ }
+
+ /**
+ * Strategy to delete the message after being processed.
+ *
+ * @param exchange the exchange
+ */
+ protected void processCommit(Exchange exchange) {
+ try {
+ if (getConfiguration().isDeleteAfterRead()) {
+ String bucketName = exchange.getIn().getHeader(S3Constants.BUCKET_NAME, String.class);
+ String key = exchange.getIn().getHeader(S3Constants.KEY, String.class);
+
+ LOG.trace("Deleting object from bucket {} with key {}...", bucketName, key);
+
+ getAmazonS3Client().deleteObject(bucketName, key);
+
+ LOG.trace("Object deleted");
+ }
+ } catch (AmazonClientException e) {
+ LOG.warn("Error occurred during deleting object", e);
+ exchange.setException(e);
+ }
+ }
+
+ /**
+ * Strategy when processing the exchange failed.
+ *
+ * @param exchange the exchange
+ */
+ protected void processRollback(Exchange exchange) {
+ Exception cause = exchange.getException();
+ if (cause != null) {
+ LOG.warn("Exchange failed, so rolling back message status: " + exchange, cause);
+ } else {
+ LOG.warn("Exchange failed, so rolling back message status: {}", exchange);
+ }
+ }
+
+ public boolean isBatchAllowed() {
+ // stop if we are not running
+ boolean answer = isRunAllowed();
+ if (!answer) {
+ return false;
+ }
+
+ if (shutdownRunningTask == null) {
+ // we are not shutting down so continue to run
+ return true;
+ }
+
+ // we are shutting down so only continue if we are configured to complete all tasks
+ return ShutdownRunningTask.CompleteAllTasks == shutdownRunningTask;
+ }
+
+ public boolean deferShutdown(ShutdownRunningTask shutdownRunningTask) {
+ // store a reference what to do in case when shutting down and we have pending messages
+ this.shutdownRunningTask = shutdownRunningTask;
+ // do not defer shutdown
+ return false;
+ }
+
+ public int getPendingExchangesSize() {
+ // only return the real pending size in case we are configured to complete all tasks
+ if (ShutdownRunningTask.CompleteAllTasks == shutdownRunningTask) {
+ return pendingExchanges;
+ } else {
+ return 0;
+ }
+ }
+
+ public void prepareShutdown() {
+ // noop
+ }
+
+ protected S3Configuration getConfiguration() {
+ return getEndpoint().getConfiguration();
+ }
+
+ protected AmazonS3Client getAmazonS3Client() {
+ return getEndpoint().getS3Client();
+ }
+
+ @Override
+ public S3Endpoint getEndpoint() {
+ return (S3Endpoint) super.getEndpoint();
+ }
+
+ public void setMaxMessagesPerPoll(int maxMessagesPerPoll) {
+ getEndpoint().setMaxMessagesPerPoll(maxMessagesPerPoll);
+ }
+
+ public int getMaxMessagesPerPoll() {
+ return getEndpoint().getMaxMessagesPerPoll();
+ }
+
+ @Override
+ public String toString() {
+ return "S3Consumer[" + DefaultEndpoint.sanitizeUri(getEndpoint().getEndpointUri()) + "]";
+ }
+}
\ No newline at end of file
Added: camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Endpoint.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Endpoint.java?rev=1087367&view=auto
==============================================================================
--- camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Endpoint.java (added)
+++ camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Endpoint.java Thu Mar 31 16:46:38 2011
@@ -0,0 +1,170 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws.s3;
+
+import java.util.List;
+
+import com.amazonaws.auth.AWSCredentials;
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.services.s3.AmazonS3Client;
+import com.amazonaws.services.s3.model.Bucket;
+import com.amazonaws.services.s3.model.CreateBucketRequest;
+import com.amazonaws.services.s3.model.ObjectMetadata;
+import com.amazonaws.services.s3.model.S3Object;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.Consumer;
+import org.apache.camel.Exchange;
+import org.apache.camel.ExchangePattern;
+import org.apache.camel.Message;
+import org.apache.camel.Processor;
+import org.apache.camel.Producer;
+import org.apache.camel.impl.DefaultExchange;
+import org.apache.camel.impl.ScheduledPollEndpoint;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Defines the <a href="http://camel.apache.org/aws.html">AWS S3 Endpoint</a>.
+ *
+ */
+public class S3Endpoint extends ScheduledPollEndpoint {
+
+ private static final Logger LOG = LoggerFactory.getLogger(S3Endpoint.class);
+
+ private AmazonS3Client s3Client;
+ private S3Configuration configuration;
+ private int maxMessagesPerPoll;
+
+ public S3Endpoint(String uri, CamelContext context, S3Configuration configuration) {
+ super(uri, context);
+ this.configuration = configuration;
+ }
+
+ public Consumer createConsumer(Processor processor) throws Exception {
+ S3Consumer s3Consumer = new S3Consumer(this, processor);
+ configureConsumer(s3Consumer);
+ return s3Consumer;
+ }
+
+ public Producer createProducer() throws Exception {
+ return new S3Producer(this);
+ }
+
+ public boolean isSingleton() {
+ return true;
+ }
+
+ @Override
+ public void doStart() throws Exception {
+ super.doStart();
+
+ String bucketName = getConfiguration().getBucketName();
+ LOG.trace("Quering whether bucket [{}] already exists...", bucketName);
+
+ List<Bucket> buckets = getS3Client().listBuckets();
+ for (Bucket bucket : buckets) {
+ if (bucketName.equals(bucket.getName())) {
+ LOG.trace("Bucket [{}] already exist", bucketName);
+ return;
+ }
+ }
+
+ LOG.trace("Bucket [{}] doesn't exist yet", bucketName);
+
+ // creates the new bucket because it doesn't exist yet
+ CreateBucketRequest createBucketRequest = new CreateBucketRequest(getConfiguration().getBucketName());
+ if (getConfiguration().getRegion() != null) {
+ createBucketRequest.setRegion(getConfiguration().getRegion());
+ }
+
+ LOG.trace("Creating bucket [{}] in region [{}] with request [{}]...", new Object[]{configuration.getBucketName(), configuration.getRegion(), createBucketRequest});
+
+ getS3Client().createBucket(createBucketRequest);
+
+ LOG.trace("Bucket created");
+ }
+
+ public Exchange createExchange(S3Object s3Object) {
+ return createExchange(getExchangePattern(), s3Object);
+ }
+
+ public Exchange createExchange(ExchangePattern pattern, S3Object s3Object) {
+ LOG.trace("Getting object with key [{}] from bucket [{}]...", s3Object.getKey(), s3Object.getBucketName());
+
+ ObjectMetadata objectMetadata = s3Object.getObjectMetadata();
+
+ LOG.trace("Got object [{}]", s3Object);
+
+ Exchange exchange = new DefaultExchange(this, pattern);
+ Message message = exchange.getIn();
+ message.setBody(s3Object.getObjectContent());
+ message.setHeader(S3Constants.KEY, s3Object.getKey());
+ message.setHeader(S3Constants.BUCKET_NAME, s3Object.getBucketName());
+ message.setHeader(S3Constants.E_TAG, objectMetadata.getETag());
+ message.setHeader(S3Constants.LAST_MODIFIED, objectMetadata.getLastModified());
+ message.setHeader(S3Constants.VERSION_ID, objectMetadata.getVersionId());
+ message.setHeader(S3Constants.LAST_MODIFIED, objectMetadata.getLastModified());
+ message.setHeader(S3Constants.CONTENT_TYPE, objectMetadata.getContentType());
+ message.setHeader(S3Constants.CONTENT_MD5, objectMetadata.getContentMD5());
+ message.setHeader(S3Constants.CONTENT_LENGTH, objectMetadata.getContentLength());
+ message.setHeader(S3Constants.CONTENT_ENCODING, objectMetadata.getContentEncoding());
+ message.setHeader(S3Constants.CONTENT_DISPOSITION, objectMetadata.getContentDisposition());
+ message.setHeader(S3Constants.CACHE_CONTROL, objectMetadata.getCacheControl());
+
+ return exchange;
+ }
+
+ public S3Configuration getConfiguration() {
+ return configuration;
+ }
+
+ public void setConfiguration(S3Configuration configuration) {
+ this.configuration = configuration;
+ }
+
+ public void setS3Client(AmazonS3Client s3Client) {
+ this.s3Client = s3Client;
+ }
+
+ public AmazonS3Client getS3Client() {
+ if (s3Client == null) {
+ s3Client = configuration.getAmazonS3Client() != null
+ ? configuration.getAmazonS3Client() : createS3Client();
+ }
+
+ return s3Client;
+ }
+
+ /**
+ * Provide the possibility to override this method for an mock implementation
+ *
+ * @return AmazonS3Client
+ */
+ AmazonS3Client createS3Client() {
+ AWSCredentials credentials = new BasicAWSCredentials(configuration.getAccessKey(), configuration.getSecretKey());
+ return new AmazonS3Client(credentials);
+ }
+
+ public int getMaxMessagesPerPoll() {
+ return maxMessagesPerPoll;
+ }
+
+ public void setMaxMessagesPerPoll(int maxMessagesPerPoll) {
+ this.maxMessagesPerPoll = maxMessagesPerPoll;
+ }
+}
\ No newline at end of file
Added: camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Producer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Producer.java?rev=1087367&view=auto
==============================================================================
--- camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Producer.java (added)
+++ camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Producer.java Thu Mar 31 16:46:38 2011
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws.s3;
+
+import java.io.InputStream;
+
+import com.amazonaws.services.s3.model.ObjectMetadata;
+import com.amazonaws.services.s3.model.PutObjectRequest;
+import com.amazonaws.services.s3.model.PutObjectResult;
+
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+import org.apache.camel.impl.DefaultEndpoint;
+import org.apache.camel.impl.DefaultProducer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A Producer which sends messages to the Amazon Web Service Simple Storage Service
+ * <a href="http://aws.amazon.com/s3/">AWS S3</a>
+ */
+public class S3Producer extends DefaultProducer {
+
+ private static final Logger LOG = LoggerFactory.getLogger(S3Producer.class);
+
+ public S3Producer(Endpoint endpoint) {
+ super(endpoint);
+ }
+
+ public void process(Exchange exchange) throws Exception {
+ ObjectMetadata objectMetadata = new ObjectMetadata();
+
+ PutObjectRequest putObjectRequest = new PutObjectRequest(
+ getConfiguration().getBucketName(),
+ determineKey(exchange),
+ exchange.getIn().getMandatoryBody(InputStream.class),
+ objectMetadata);
+
+ LOG.trace("Put object [{}] from exchange [{}]...", putObjectRequest, exchange);
+
+ PutObjectResult putObjectResult = getEndpoint().getS3Client().putObject(putObjectRequest);
+
+ LOG.trace("Received result [{}]", putObjectResult);
+
+ Message message = getMessageForResponse(exchange);
+ message.setHeader(S3Constants.E_TAG, putObjectResult.getETag());
+ if (putObjectResult.getVersionId() != null) {
+ message.setHeader(S3Constants.VERSION_ID, putObjectResult.getVersionId());
+ }
+ }
+
+ private String determineKey(Exchange exchange) {
+ String key = exchange.getIn().getHeader(S3Constants.KEY, String.class);
+ if (key == null) {
+ throw new IllegalArgumentException("AWS S3 Key header missing.");
+ }
+ return key;
+ }
+
+ private Message getMessageForResponse(Exchange exchange) {
+ if (exchange.getPattern().isOutCapable()) {
+ Message out = exchange.getOut();
+ out.copyFrom(exchange.getIn());
+ return out;
+ }
+
+ return exchange.getIn();
+ }
+
+ protected S3Configuration getConfiguration() {
+ return getEndpoint().getConfiguration();
+ }
+
+ @Override
+ public String toString() {
+ return "S3Producer[" + DefaultEndpoint.sanitizeUri(getEndpoint().getEndpointUri()) + "]";
+ }
+
+ @Override
+ public S3Endpoint getEndpoint() {
+ return (S3Endpoint) super.getEndpoint();
+ }
+}
\ No newline at end of file
Added: camel/trunk/components/camel-aws/src/main/resources/META-INF/services/org/apache/camel/component/aws-s3
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-aws/src/main/resources/META-INF/services/org/apache/camel/component/aws-s3?rev=1087367&view=auto
==============================================================================
--- camel/trunk/components/camel-aws/src/main/resources/META-INF/services/org/apache/camel/component/aws-s3 (added)
+++ camel/trunk/components/camel-aws/src/main/resources/META-INF/services/org/apache/camel/component/aws-s3 Thu Mar 31 16:46:38 2011
@@ -0,0 +1,18 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements. See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License. You may obtain a copy of the License at
+##
+## http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+
+class=org.apache.camel.component.aws.s3.S3Component
Added: camel/trunk/components/camel-aws/src/test/java/org/apache/camel/component/aws/s3/AmazonS3ClientMock.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-aws/src/test/java/org/apache/camel/component/aws/s3/AmazonS3ClientMock.java?rev=1087367&view=auto
==============================================================================
--- camel/trunk/components/camel-aws/src/test/java/org/apache/camel/component/aws/s3/AmazonS3ClientMock.java (added)
+++ camel/trunk/components/camel-aws/src/test/java/org/apache/camel/component/aws/s3/AmazonS3ClientMock.java Thu Mar 31 16:46:38 2011
@@ -0,0 +1,434 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws.s3;
+
+import java.io.File;
+import java.io.InputStream;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+
+import com.amazonaws.AmazonClientException;
+import com.amazonaws.AmazonServiceException;
+import com.amazonaws.AmazonWebServiceRequest;
+import com.amazonaws.HttpMethod;
+import com.amazonaws.Request;
+import com.amazonaws.http.HttpMethodName;
+import com.amazonaws.services.s3.AmazonS3Client;
+import com.amazonaws.services.s3.S3ResponseMetadata;
+import com.amazonaws.services.s3.model.AbortMultipartUploadRequest;
+import com.amazonaws.services.s3.model.AccessControlList;
+import com.amazonaws.services.s3.model.Bucket;
+import com.amazonaws.services.s3.model.BucketLoggingConfiguration;
+import com.amazonaws.services.s3.model.BucketNotificationConfiguration;
+import com.amazonaws.services.s3.model.BucketPolicy;
+import com.amazonaws.services.s3.model.BucketVersioningConfiguration;
+import com.amazonaws.services.s3.model.CannedAccessControlList;
+import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest;
+import com.amazonaws.services.s3.model.CompleteMultipartUploadResult;
+import com.amazonaws.services.s3.model.CopyObjectRequest;
+import com.amazonaws.services.s3.model.CopyObjectResult;
+import com.amazonaws.services.s3.model.CreateBucketRequest;
+import com.amazonaws.services.s3.model.DeleteBucketRequest;
+import com.amazonaws.services.s3.model.DeleteObjectRequest;
+import com.amazonaws.services.s3.model.DeleteVersionRequest;
+import com.amazonaws.services.s3.model.GeneratePresignedUrlRequest;
+import com.amazonaws.services.s3.model.GetObjectMetadataRequest;
+import com.amazonaws.services.s3.model.GetObjectRequest;
+import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest;
+import com.amazonaws.services.s3.model.InitiateMultipartUploadResult;
+import com.amazonaws.services.s3.model.ListBucketsRequest;
+import com.amazonaws.services.s3.model.ListMultipartUploadsRequest;
+import com.amazonaws.services.s3.model.ListObjectsRequest;
+import com.amazonaws.services.s3.model.ListPartsRequest;
+import com.amazonaws.services.s3.model.ListVersionsRequest;
+import com.amazonaws.services.s3.model.MultipartUploadListing;
+import com.amazonaws.services.s3.model.ObjectListing;
+import com.amazonaws.services.s3.model.ObjectMetadata;
+import com.amazonaws.services.s3.model.Owner;
+import com.amazonaws.services.s3.model.PartListing;
+import com.amazonaws.services.s3.model.PutObjectRequest;
+import com.amazonaws.services.s3.model.PutObjectResult;
+import com.amazonaws.services.s3.model.Region;
+import com.amazonaws.services.s3.model.S3Object;
+import com.amazonaws.services.s3.model.S3ObjectSummary;
+import com.amazonaws.services.s3.model.SetBucketLoggingConfigurationRequest;
+import com.amazonaws.services.s3.model.SetBucketVersioningConfigurationRequest;
+import com.amazonaws.services.s3.model.StorageClass;
+import com.amazonaws.services.s3.model.UploadPartRequest;
+import com.amazonaws.services.s3.model.UploadPartResult;
+import com.amazonaws.services.s3.model.VersionListing;
+
+public class AmazonS3ClientMock extends AmazonS3Client {
+
+ List<S3Object> objects = new ArrayList<S3Object>();
+
+ public AmazonS3ClientMock() {
+ super(null);
+ }
+
+ @Override
+ public VersionListing listNextBatchOfVersions(VersionListing previousVersionListing) throws AmazonClientException, AmazonServiceException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public VersionListing listVersions(String bucketName, String prefix) throws AmazonClientException, AmazonServiceException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public VersionListing listVersions(String bucketName, String prefix, String keyMarker, String versionIdMarker, String delimiter, Integer maxKeys)
+ throws AmazonClientException, AmazonServiceException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public VersionListing listVersions(ListVersionsRequest listVersionsRequest) throws AmazonClientException, AmazonServiceException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public ObjectListing listObjects(String bucketName) throws AmazonClientException, AmazonServiceException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public ObjectListing listObjects(String bucketName, String prefix) throws AmazonClientException, AmazonServiceException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public ObjectListing listObjects(ListObjectsRequest listObjectsRequest) throws AmazonClientException, AmazonServiceException {
+ ObjectListing objectListing = new ObjectListing();
+ int capacity = listObjectsRequest.getMaxKeys();
+
+ for (int index = 0; index < objects.size() && index < capacity; index++) {
+ S3ObjectSummary s3ObjectSummary = new S3ObjectSummary();
+ s3ObjectSummary.setBucketName(objects.get(index).getBucketName());
+ s3ObjectSummary.setKey(objects.get(index).getKey());
+
+ objectListing.getObjectSummaries().add(s3ObjectSummary);
+ }
+
+ return objectListing;
+ }
+
+ @Override
+ public ObjectListing listNextBatchOfObjects(ObjectListing previousObjectListing) throws AmazonClientException, AmazonServiceException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Owner getS3AccountOwner() throws AmazonClientException, AmazonServiceException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public List<Bucket> listBuckets(ListBucketsRequest listBucketsRequest) throws AmazonClientException, AmazonServiceException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public List<Bucket> listBuckets() throws AmazonClientException, AmazonServiceException {
+ return new ArrayList<Bucket>();
+ }
+
+ @Override
+ public String getBucketLocation(String bucketName) throws AmazonClientException, AmazonServiceException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Bucket createBucket(String bucketName) throws AmazonClientException, AmazonServiceException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Bucket createBucket(String bucketName, Region region) throws AmazonClientException, AmazonServiceException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Bucket createBucket(String bucketName, String region) throws AmazonClientException, AmazonServiceException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Bucket createBucket(CreateBucketRequest createBucketRequest) throws AmazonClientException, AmazonServiceException {
+ Bucket bucket = new Bucket();
+ bucket.setName(createBucketRequest.getBucketName());
+ bucket.setCreationDate(new Date());
+ bucket.setOwner(new Owner("c2efc7302b9011ba9a78a92ac5fd1cd47b61790499ab5ddf5a37c31f0638a8fc ", "Christian Mueller"));
+ return bucket;
+ }
+
+ @Override
+ public AccessControlList getObjectAcl(String bucketName, String key) throws AmazonClientException, AmazonServiceException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public AccessControlList getObjectAcl(String bucketName, String key, String versionId) throws AmazonClientException, AmazonServiceException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void setObjectAcl(String bucketName, String key, AccessControlList acl) throws AmazonClientException, AmazonServiceException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void setObjectAcl(String bucketName, String key, CannedAccessControlList acl) throws AmazonClientException, AmazonServiceException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void setObjectAcl(String bucketName, String key, String versionId, AccessControlList acl) throws AmazonClientException, AmazonServiceException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void setObjectAcl(String bucketName, String key, String versionId, CannedAccessControlList acl) throws AmazonClientException, AmazonServiceException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public AccessControlList getBucketAcl(String bucketName) throws AmazonClientException, AmazonServiceException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void setBucketAcl(String bucketName, AccessControlList acl) throws AmazonClientException, AmazonServiceException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void setBucketAcl(String bucketName, CannedAccessControlList acl) throws AmazonClientException, AmazonServiceException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public ObjectMetadata getObjectMetadata(String bucketName, String key) throws AmazonClientException, AmazonServiceException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public ObjectMetadata getObjectMetadata(GetObjectMetadataRequest getObjectMetadataRequest) throws AmazonClientException, AmazonServiceException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public S3Object getObject(String bucketName, String key) throws AmazonClientException, AmazonServiceException {
+ for (S3Object s3Object : objects) {
+ if (bucketName.equals(s3Object.getBucketName()) && key.equals(s3Object.getKey())) {
+ return s3Object;
+ }
+ }
+
+ return null;
+ }
+
+ @Override
+ public boolean doesBucketExist(String bucketName) throws AmazonClientException, AmazonServiceException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void changeObjectStorageClass(String bucketName, String key, StorageClass newStorageClass) throws AmazonClientException, AmazonServiceException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public S3Object getObject(GetObjectRequest getObjectRequest) throws AmazonClientException, AmazonServiceException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public ObjectMetadata getObject(GetObjectRequest getObjectRequest, File destinationFile) throws AmazonClientException, AmazonServiceException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void deleteBucket(String bucketName) throws AmazonClientException, AmazonServiceException {
+ // noop
+ }
+
+ @Override
+ public void deleteBucket(DeleteBucketRequest deleteBucketRequest) throws AmazonClientException, AmazonServiceException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public PutObjectResult putObject(String bucketName, String key, File file) throws AmazonClientException, AmazonServiceException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public PutObjectResult putObject(String bucketName, String key, InputStream input, ObjectMetadata metadata) throws AmazonClientException, AmazonServiceException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public PutObjectResult putObject(PutObjectRequest putObjectRequest) throws AmazonClientException, AmazonServiceException {
+ S3Object s3Object = new S3Object();
+ s3Object.setBucketName(putObjectRequest.getBucketName());
+ s3Object.setKey(putObjectRequest.getKey());
+ s3Object.setObjectContent(putObjectRequest.getInputStream());
+ objects.add(s3Object);
+
+ PutObjectResult putObjectResult = new PutObjectResult();
+ putObjectResult.setETag("3a5c8b1ad448bca04584ecb55b836264");
+ return putObjectResult;
+ }
+
+ @Override
+ public CopyObjectResult copyObject(String sourceBucketName, String sourceKey, String destinationBucketName, String destinationKey) throws AmazonClientException, AmazonServiceException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public CopyObjectResult copyObject(CopyObjectRequest copyObjectRequest) throws AmazonClientException, AmazonServiceException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void deleteObject(String bucketName, String key) throws AmazonClientException, AmazonServiceException {
+ //noop
+ }
+
+ @Override
+ public void deleteObject(DeleteObjectRequest deleteObjectRequest) throws AmazonClientException, AmazonServiceException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void deleteVersion(String bucketName, String key, String versionId) throws AmazonClientException, AmazonServiceException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void deleteVersion(DeleteVersionRequest deleteVersionRequest) throws AmazonClientException, AmazonServiceException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void setBucketVersioningConfiguration(SetBucketVersioningConfigurationRequest setBucketVersioningConfigurationRequest) throws AmazonClientException, AmazonServiceException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public BucketVersioningConfiguration getBucketVersioningConfiguration(String bucketName) throws AmazonClientException, AmazonServiceException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void setBucketNotificationConfiguration(String bucketName, BucketNotificationConfiguration bucketNotificationConfiguration) throws AmazonClientException, AmazonServiceException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public BucketNotificationConfiguration getBucketNotificationConfiguration(String bucketName) throws AmazonClientException, AmazonServiceException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public BucketLoggingConfiguration getBucketLoggingConfiguration(String bucketName) throws AmazonClientException, AmazonServiceException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void setBucketLoggingConfiguration(SetBucketLoggingConfigurationRequest setBucketLoggingConfigurationRequest) throws AmazonClientException, AmazonServiceException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public BucketPolicy getBucketPolicy(String bucketName) throws AmazonClientException, AmazonServiceException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void setBucketPolicy(String bucketName, String policyText) throws AmazonClientException, AmazonServiceException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void deleteBucketPolicy(String bucketName) throws AmazonClientException, AmazonServiceException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public URL generatePresignedUrl(String bucketName, String key, Date expiration) throws AmazonClientException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public URL generatePresignedUrl(String bucketName, String key, Date expiration, HttpMethod method) throws AmazonClientException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public URL generatePresignedUrl(GeneratePresignedUrlRequest generatePresignedUrlRequest) throws AmazonClientException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void abortMultipartUpload(AbortMultipartUploadRequest abortMultipartUploadRequest) throws AmazonClientException, AmazonServiceException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public CompleteMultipartUploadResult completeMultipartUpload(CompleteMultipartUploadRequest completeMultipartUploadRequest) throws AmazonClientException, AmazonServiceException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public InitiateMultipartUploadResult initiateMultipartUpload(InitiateMultipartUploadRequest initiateMultipartUploadRequest) throws AmazonClientException, AmazonServiceException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public MultipartUploadListing listMultipartUploads(ListMultipartUploadsRequest listMultipartUploadsRequest) throws AmazonClientException, AmazonServiceException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public PartListing listParts(ListPartsRequest listPartsRequest) throws AmazonClientException, AmazonServiceException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public UploadPartResult uploadPart(UploadPartRequest uploadPartRequest) throws AmazonClientException, AmazonServiceException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public S3ResponseMetadata getCachedResponseMetadata(AmazonWebServiceRequest request) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ protected Request<Void> createRequest(String bucketName, String key, AmazonWebServiceRequest originalRequest) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ protected <T> void signRequest(Request<T> request, HttpMethodName methodName, String bucketName, String key) {
+ throw new UnsupportedOperationException();
+ }
+}
\ No newline at end of file
Added: camel/trunk/components/camel-aws/src/test/java/org/apache/camel/component/aws/s3/S3BatchConsumerTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-aws/src/test/java/org/apache/camel/component/aws/s3/S3BatchConsumerTest.java?rev=1087367&view=auto
==============================================================================
--- camel/trunk/components/camel-aws/src/test/java/org/apache/camel/component/aws/s3/S3BatchConsumerTest.java (added)
+++ camel/trunk/components/camel-aws/src/test/java/org/apache/camel/component/aws/s3/S3BatchConsumerTest.java Thu Mar 31 16:46:38 2011
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws.s3;
+
+import com.amazonaws.services.s3.model.S3Object;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.impl.JndiRegistry;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+public class S3BatchConsumerTest extends CamelTestSupport {
+
+ @EndpointInject(uri = "mock:result")
+ private MockEndpoint mock;
+
+ @Test
+ public void receiveBatch() throws Exception {
+ mock.expectedMessageCount(5);
+ assertMockEndpointsSatisfied();
+
+ mock.message(0).property(Exchange.BATCH_INDEX).isEqualTo(0);
+ mock.message(1).property(Exchange.BATCH_INDEX).isEqualTo(1);
+ mock.message(2).property(Exchange.BATCH_INDEX).isEqualTo(2);
+ mock.message(3).property(Exchange.BATCH_INDEX).isEqualTo(3);
+ mock.message(4).property(Exchange.BATCH_INDEX).isEqualTo(4);
+ mock.message(0).property(Exchange.BATCH_COMPLETE).isEqualTo(false);
+ mock.message(1).property(Exchange.BATCH_COMPLETE).isEqualTo(false);
+ mock.message(2).property(Exchange.BATCH_COMPLETE).isEqualTo(false);
+ mock.message(3).property(Exchange.BATCH_COMPLETE).isEqualTo(false);
+ mock.message(3).property(Exchange.BATCH_COMPLETE).isEqualTo(false);
+ mock.message(4).property(Exchange.BATCH_COMPLETE).isEqualTo(true);
+ mock.expectedPropertyReceived(Exchange.BATCH_SIZE, 5);
+ }
+
+ @Override
+ protected JndiRegistry createRegistry() throws Exception {
+ JndiRegistry registry = super.createRegistry();
+
+ AmazonS3ClientMock clientMock = new AmazonS3ClientMock();
+ // add 6 messages, one more we will poll
+ for (int counter = 0; counter < 6; counter++) {
+ S3Object s3Object = new S3Object();
+ s3Object.setBucketName("mycamelbucket");
+ s3Object.setKey("counter-" + counter);
+
+ clientMock.objects.add(s3Object);
+ }
+
+ registry.bind("amazonS3Client", clientMock);
+
+ return registry;
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("aws-s3://mycamelbucket?amazonS3Client=#amazonS3Client®ion=us-west-1&delay=5000&maxMessagesPerPoll=5")
+ .to("mock:result");
+ }
+ };
+ }
+}
\ No newline at end of file
Added: camel/trunk/components/camel-aws/src/test/java/org/apache/camel/component/aws/s3/S3ComponentConfigurationTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-aws/src/test/java/org/apache/camel/component/aws/s3/S3ComponentConfigurationTest.java?rev=1087367&view=auto
==============================================================================
--- camel/trunk/components/camel-aws/src/test/java/org/apache/camel/component/aws/s3/S3ComponentConfigurationTest.java (added)
+++ camel/trunk/components/camel-aws/src/test/java/org/apache/camel/component/aws/s3/S3ComponentConfigurationTest.java Thu Mar 31 16:46:38 2011
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws.s3;
+
+import org.apache.camel.impl.JndiRegistry;
+import org.apache.camel.impl.PropertyPlaceholderDelegateRegistry;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+public class S3ComponentConfigurationTest extends CamelTestSupport {
+
+ @Test
+ public void createEndpointWithMinimalConfiguration() throws Exception {
+ S3Component component = new S3Component(context);
+ S3Endpoint endpoint = (S3Endpoint) component.createEndpoint("aws-s3://MyBucket?accessKey=xxx&secretKey=yyy");
+
+ assertEquals("MyBucket", endpoint.getConfiguration().getBucketName());
+ assertEquals("xxx", endpoint.getConfiguration().getAccessKey());
+ assertEquals("yyy", endpoint.getConfiguration().getSecretKey());
+ assertNull(endpoint.getConfiguration().getAmazonS3Client());
+ assertNull(endpoint.getConfiguration().getRegion());
+ assertTrue(endpoint.getConfiguration().isDeleteAfterRead());
+ }
+
+ @Test
+ public void createEndpointWithMinimalConfigurationAndProvidedClient() throws Exception {
+ AmazonS3ClientMock mock = new AmazonS3ClientMock();
+
+ ((JndiRegistry) ((PropertyPlaceholderDelegateRegistry) context.getRegistry()).getRegistry()).bind("amazonS3Client", mock);
+
+ S3Component component = new S3Component(context);
+ S3Endpoint endpoint = (S3Endpoint) component.createEndpoint("aws-s3://MyBucket?amazonS3Client=#amazonS3Client");
+
+ assertEquals("MyBucket", endpoint.getConfiguration().getBucketName());
+ assertNull(endpoint.getConfiguration().getAccessKey());
+ assertNull(endpoint.getConfiguration().getSecretKey());
+ assertSame(mock, endpoint.getConfiguration().getAmazonS3Client());
+ assertNull(endpoint.getConfiguration().getRegion());
+ assertTrue(endpoint.getConfiguration().isDeleteAfterRead());
+ }
+
+ @Test
+ public void createEndpointWithMaximalConfiguration() throws Exception {
+ S3Component component = new S3Component(context);
+ S3Endpoint endpoint = (S3Endpoint) component.createEndpoint("aws-s3://MyBucket?accessKey=xxx&secretKey=yyy®ion=us-west-1&deleteAfterRead=false");
+
+ assertEquals("MyBucket", endpoint.getConfiguration().getBucketName());
+ assertEquals("xxx", endpoint.getConfiguration().getAccessKey());
+ assertEquals("yyy", endpoint.getConfiguration().getSecretKey());
+ assertNull(endpoint.getConfiguration().getAmazonS3Client());
+ assertEquals("us-west-1", endpoint.getConfiguration().getRegion());
+ assertFalse(endpoint.getConfiguration().isDeleteAfterRead());
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void createEndpointWithoutBucketName() throws Exception {
+ S3Component component = new S3Component(context);
+ component.createEndpoint("aws-s3:// ");
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void createEndpointWithoutAccessKeyConfiguration() throws Exception {
+ S3Component component = new S3Component(context);
+ component.createEndpoint("aws-sns://MyTopic?secretKey=yyy");
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void createEndpointWithoutSecretKeyConfiguration() throws Exception {
+ S3Component component = new S3Component(context);
+ component.createEndpoint("aws-sns://MyTopic?accessKey=xxx");
+ }
+}
\ No newline at end of file
Added: camel/trunk/components/camel-aws/src/test/java/org/apache/camel/component/aws/s3/S3ComponentSpringTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-aws/src/test/java/org/apache/camel/component/aws/s3/S3ComponentSpringTest.java?rev=1087367&view=auto
==============================================================================
--- camel/trunk/components/camel-aws/src/test/java/org/apache/camel/component/aws/s3/S3ComponentSpringTest.java (added)
+++ camel/trunk/components/camel-aws/src/test/java/org/apache/camel/component/aws/s3/S3ComponentSpringTest.java Thu Mar 31 16:46:38 2011
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws.s3;
+
+import java.io.InputStream;
+
+import org.apache.camel.EndpointInject;
+import org.apache.camel.Exchange;
+import org.apache.camel.ExchangePattern;
+import org.apache.camel.Message;
+import org.apache.camel.Processor;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.test.junit4.CamelSpringTestSupport;
+import org.junit.Test;
+import org.springframework.context.support.ClassPathXmlApplicationContext;
+
+public class S3ComponentSpringTest extends CamelSpringTestSupport {
+
+ @EndpointInject(uri = "direct:start")
+ private ProducerTemplate template;
+
+ @EndpointInject(uri = "mock:result")
+ private MockEndpoint result;
+
+ @Test
+ public void sendInOnly() throws Exception {
+ result.expectedMessageCount(1);
+
+ Exchange exchange = template.send("direct:start", ExchangePattern.InOnly, new Processor() {
+ public void process(Exchange exchange) throws Exception {
+ exchange.getIn().setHeader(S3Constants.KEY, "CamelUnitTest");
+ exchange.getIn().setBody("This is my bucket content.");
+ }
+ });
+
+ assertMockEndpointsSatisfied();
+
+ assertResultExchange(result.getExchanges().get(0));
+
+ assertResponseMessage(exchange.getIn());
+ }
+
+ @Test
+ public void sendInOut() throws Exception {
+ result.expectedMessageCount(1);
+
+ Exchange exchange = template.send("direct:start", ExchangePattern.InOut, new Processor() {
+ public void process(Exchange exchange) throws Exception {
+ exchange.getIn().setHeader(S3Constants.KEY, "CamelUnitTest");
+ exchange.getIn().setBody("This is my bucket content.");
+ }
+ });
+
+ assertMockEndpointsSatisfied();
+
+ assertResultExchange(result.getExchanges().get(0));
+
+ assertResponseMessage(exchange.getOut());
+ }
+
+ private void assertResultExchange(Exchange resultExchange) {
+ assertIsInstanceOf(InputStream.class, resultExchange.getIn().getBody());
+ assertEquals("This is my bucket content.", resultExchange.getIn().getBody(String.class));
+ assertEquals("mycamelbucket", resultExchange.getIn().getHeader(S3Constants.BUCKET_NAME));
+ assertEquals("CamelUnitTest", resultExchange.getIn().getHeader(S3Constants.KEY));
+ assertNull(resultExchange.getIn().getHeader(S3Constants.VERSION_ID)); // not enabled on this bucket
+ assertNull(resultExchange.getIn().getHeader(S3Constants.LAST_MODIFIED));
+ assertNull(resultExchange.getIn().getHeader(S3Constants.E_TAG));
+ assertNull(resultExchange.getIn().getHeader(S3Constants.CONTENT_TYPE));
+ assertNull(resultExchange.getIn().getHeader(S3Constants.CONTENT_ENCODING));
+ assertEquals(0L, resultExchange.getIn().getHeader(S3Constants.CONTENT_LENGTH));
+ assertNull(resultExchange.getIn().getHeader(S3Constants.CONTENT_DISPOSITION));
+ assertNull(resultExchange.getIn().getHeader(S3Constants.CONTENT_MD5));
+ assertNull(resultExchange.getIn().getHeader(S3Constants.CACHE_CONTROL));
+ }
+
+ private void assertResponseMessage(Message message) {
+ assertEquals("3a5c8b1ad448bca04584ecb55b836264", message.getHeader(S3Constants.E_TAG));
+ assertNull(message.getHeader(S3Constants.VERSION_ID));
+ }
+
+ @Override
+ protected ClassPathXmlApplicationContext createApplicationContext() {
+ return new ClassPathXmlApplicationContext("org/apache/camel/component/aws/s3/S3ComponentSpringTest-context.xml");
+ }
+}
\ No newline at end of file
Added: camel/trunk/components/camel-aws/src/test/java/org/apache/camel/component/aws/s3/S3ComponentTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-aws/src/test/java/org/apache/camel/component/aws/s3/S3ComponentTest.java?rev=1087367&view=auto
==============================================================================
--- camel/trunk/components/camel-aws/src/test/java/org/apache/camel/component/aws/s3/S3ComponentTest.java (added)
+++ camel/trunk/components/camel-aws/src/test/java/org/apache/camel/component/aws/s3/S3ComponentTest.java Thu Mar 31 16:46:38 2011
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws.s3;
+
+import java.io.InputStream;
+
+import org.apache.camel.EndpointInject;
+import org.apache.camel.Exchange;
+import org.apache.camel.ExchangePattern;
+import org.apache.camel.Message;
+import org.apache.camel.Processor;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.impl.JndiRegistry;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+public class S3ComponentTest extends CamelTestSupport {
+
+ @EndpointInject(uri = "direct:start")
+ private ProducerTemplate template;
+
+ @EndpointInject(uri = "mock:result")
+ private MockEndpoint result;
+
+ @Test
+ public void sendInOnly() throws Exception {
+ result.expectedMessageCount(1);
+
+ Exchange exchange = template.send("direct:start", ExchangePattern.InOnly, new Processor() {
+ public void process(Exchange exchange) throws Exception {
+ exchange.getIn().setHeader(S3Constants.KEY, "CamelUnitTest");
+ exchange.getIn().setBody("This is my bucket content.");
+ }
+ });
+
+ assertMockEndpointsSatisfied();
+
+ assertResultExchange(result.getExchanges().get(0));
+
+ assertResponseMessage(exchange.getIn());
+ }
+
+ @Test
+ public void sendInOut() throws Exception {
+ result.expectedMessageCount(1);
+
+ Exchange exchange = template.send("direct:start", ExchangePattern.InOut, new Processor() {
+ public void process(Exchange exchange) throws Exception {
+ exchange.getIn().setHeader(S3Constants.KEY, "CamelUnitTest");
+ exchange.getIn().setBody("This is my bucket content.");
+ }
+ });
+
+ assertMockEndpointsSatisfied();
+
+ assertResultExchange(result.getExchanges().get(0));
+
+ assertResponseMessage(exchange.getOut());
+ }
+
+ private void assertResultExchange(Exchange resultExchange) {
+ assertIsInstanceOf(InputStream.class, resultExchange.getIn().getBody());
+ assertEquals("This is my bucket content.", resultExchange.getIn().getBody(String.class));
+ assertEquals("mycamelbucket", resultExchange.getIn().getHeader(S3Constants.BUCKET_NAME));
+ assertEquals("CamelUnitTest", resultExchange.getIn().getHeader(S3Constants.KEY));
+ assertNull(resultExchange.getIn().getHeader(S3Constants.VERSION_ID)); // not enabled on this bucket
+ assertNull(resultExchange.getIn().getHeader(S3Constants.LAST_MODIFIED));
+ assertNull(resultExchange.getIn().getHeader(S3Constants.E_TAG));
+ assertNull(resultExchange.getIn().getHeader(S3Constants.CONTENT_TYPE));
+ assertNull(resultExchange.getIn().getHeader(S3Constants.CONTENT_ENCODING));
+ assertEquals(0L, resultExchange.getIn().getHeader(S3Constants.CONTENT_LENGTH));
+ assertNull(resultExchange.getIn().getHeader(S3Constants.CONTENT_DISPOSITION));
+ assertNull(resultExchange.getIn().getHeader(S3Constants.CONTENT_MD5));
+ assertNull(resultExchange.getIn().getHeader(S3Constants.CACHE_CONTROL));
+ }
+
+ private void assertResponseMessage(Message message) {
+ assertEquals("3a5c8b1ad448bca04584ecb55b836264", message.getHeader(S3Constants.E_TAG));
+ assertNull(message.getHeader(S3Constants.VERSION_ID));
+ }
+
+ @Override
+ protected JndiRegistry createRegistry() throws Exception {
+ JndiRegistry registry = super.createRegistry();
+ registry.bind("amazonS3Client", new AmazonS3ClientMock());
+
+ return registry;
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("direct:start")
+ .to("aws-s3://mycamelbucket?amazonS3Client=#amazonS3Client®ion=us-west-1");
+
+ from("aws-s3://mycamelbucket?amazonS3Client=#amazonS3Client®ion=us-west-1&maxMessagesPerPoll=5")
+ .to("mock:result");
+ }
+ };
+ }
+}
\ No newline at end of file
Added: camel/trunk/components/camel-aws/src/test/java/org/apache/camel/component/aws/s3/integration/S3ComponentIntegrationTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-aws/src/test/java/org/apache/camel/component/aws/s3/integration/S3ComponentIntegrationTest.java?rev=1087367&view=auto
==============================================================================
--- camel/trunk/components/camel-aws/src/test/java/org/apache/camel/component/aws/s3/integration/S3ComponentIntegrationTest.java (added)
+++ camel/trunk/components/camel-aws/src/test/java/org/apache/camel/component/aws/s3/integration/S3ComponentIntegrationTest.java Thu Mar 31 16:46:38 2011
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws.s3.integration;
+
+import java.io.InputStream;
+
+import org.apache.camel.EndpointInject;
+import org.apache.camel.Exchange;
+import org.apache.camel.ExchangePattern;
+import org.apache.camel.Message;
+import org.apache.camel.Processor;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.aws.s3.S3Constants;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Ignore;
+import org.junit.Test;
+
+@Ignore("Must be manually tested. Provide your own accessKey and secretKey!")
+public class S3ComponentIntegrationTest extends CamelTestSupport {
+
+ @EndpointInject(uri = "direct:start")
+ private ProducerTemplate template;
+
+ @EndpointInject(uri = "mock:result")
+ private MockEndpoint result;
+
+ @Test
+ public void sendInOnly() throws Exception {
+ result.expectedMessageCount(1);
+
+ Exchange exchange = template.send("direct:start", ExchangePattern.InOnly, new Processor() {
+ public void process(Exchange exchange) throws Exception {
+ exchange.getIn().setHeader(S3Constants.KEY, "CamelUnitTest");
+ exchange.getIn().setBody("This is my bucket content.");
+ }
+ });
+
+ assertMockEndpointsSatisfied();
+
+ assertResultExchange(result.getExchanges().get(0));
+
+ assertResponseMessage(exchange.getIn());
+ }
+
+ @Test
+ public void sendInOut() throws Exception {
+ result.expectedMessageCount(1);
+
+ Exchange exchange = template.send("direct:start", ExchangePattern.InOut, new Processor() {
+ public void process(Exchange exchange) throws Exception {
+ exchange.getIn().setHeader(S3Constants.KEY, "CamelUnitTest");
+ exchange.getIn().setBody("This is my bucket content.");
+ }
+ });
+
+ assertMockEndpointsSatisfied();
+
+ assertResultExchange(result.getExchanges().get(0));
+
+ assertResponseMessage(exchange.getOut());
+ }
+
+ private void assertResultExchange(Exchange resultExchange) {
+ assertIsInstanceOf(InputStream.class, resultExchange.getIn().getBody());
+ assertEquals("This is my bucket content.", resultExchange.getIn().getBody(String.class));
+ assertEquals("mycamelbucket", resultExchange.getIn().getHeader(S3Constants.BUCKET_NAME));
+ assertEquals("CamelUnitTest", resultExchange.getIn().getHeader(S3Constants.KEY));
+ assertNull(resultExchange.getIn().getHeader(S3Constants.VERSION_ID)); // not enabled on this bucket
+ assertNotNull(resultExchange.getIn().getHeader(S3Constants.LAST_MODIFIED));
+ assertEquals("3a5c8b1ad448bca04584ecb55b836264", resultExchange.getIn().getHeader(S3Constants.E_TAG));
+ assertEquals("application/octet-stream", resultExchange.getIn().getHeader(S3Constants.CONTENT_TYPE));
+ assertNull(resultExchange.getIn().getHeader(S3Constants.CONTENT_ENCODING));
+ assertEquals(26L, resultExchange.getIn().getHeader(S3Constants.CONTENT_LENGTH));
+ assertNull(resultExchange.getIn().getHeader(S3Constants.CONTENT_DISPOSITION));
+ assertNull(resultExchange.getIn().getHeader(S3Constants.CONTENT_MD5));
+ assertNull(resultExchange.getIn().getHeader(S3Constants.CACHE_CONTROL));
+ }
+
+ private void assertResponseMessage(Message message) {
+ assertEquals("3a5c8b1ad448bca04584ecb55b836264", message.getHeader(S3Constants.E_TAG));
+ assertNull(message.getHeader(S3Constants.VERSION_ID));
+ }
+
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("direct:start")
+ .to("aws-s3://mycamelbucket?accessKey=xxx&secretKey=yyy®ion=us-west-1");
+
+ from("aws-s3://mycamelbucket?accessKey=xxx&secretKey=yyy®ion=us-west-1")
+ .to("mock:result");
+ }
+ };
+ }
+}
\ No newline at end of file
Added: camel/trunk/components/camel-aws/src/test/resources/org/apache/camel/component/aws/s3/S3ComponentSpringTest-context.xml
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-aws/src/test/resources/org/apache/camel/component/aws/s3/S3ComponentSpringTest-context.xml?rev=1087367&view=auto
==============================================================================
--- camel/trunk/components/camel-aws/src/test/resources/org/apache/camel/component/aws/s3/S3ComponentSpringTest-context.xml (added)
+++ camel/trunk/components/camel-aws/src/test/resources/org/apache/camel/component/aws/s3/S3ComponentSpringTest-context.xml Thu Mar 31 16:46:38 2011
@@ -0,0 +1,36 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version
+ 2.0 (the "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0 Unless required by
+ applicable law or agreed to in writing, software distributed under
+ the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES
+ OR CONDITIONS OF ANY KIND, either express or implied. See the
+ License for the specific language governing permissions and
+ limitations under the License.
+-->
+<beans xmlns="http://www.springframework.org/schema/beans"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="
+ http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
+ http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd">
+
+ <camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
+ <route>
+ <from uri="direct:start"/>
+ <to uri="aws-s3://mycamelbucket?amazonS3Client=#amazonS3Client"/>
+ </route>
+
+ <route>
+ <from uri="aws-s3://mycamelbucket?amazonS3Client=#amazonS3Client&maxMessagesPerPoll=5"/>
+ <to uri="mock:result"/>
+ </route>
+ </camelContext>
+
+ <bean id="amazonS3Client" class="org.apache.camel.component.aws.s3.AmazonS3ClientMock"/>
+</beans>
\ No newline at end of file