You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by cm...@apache.org on 2011/03/31 18:46:39 UTC

svn commit: r1087367 [1/2] - in /camel/trunk: components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/ components/camel-aws/src/main/resources/META-INF/services/org/apache/camel/component/ components/camel-aws/src/test/java/org/apache/came...

Author: cmueller
Date: Thu Mar 31 16:46:38 2011
New Revision: 1087367

URL: http://svn.apache.org/viewvc?rev=1087367&view=rev
Log:
CAMEL-3546: Add S3 support to camel-aws

Added:
    camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/
    camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Component.java
    camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Configuration.java
    camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Constants.java
    camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java
    camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Endpoint.java
    camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Producer.java
    camel/trunk/components/camel-aws/src/main/resources/META-INF/services/org/apache/camel/component/aws-s3
    camel/trunk/components/camel-aws/src/test/java/org/apache/camel/component/aws/s3/
    camel/trunk/components/camel-aws/src/test/java/org/apache/camel/component/aws/s3/AmazonS3ClientMock.java
    camel/trunk/components/camel-aws/src/test/java/org/apache/camel/component/aws/s3/S3BatchConsumerTest.java
    camel/trunk/components/camel-aws/src/test/java/org/apache/camel/component/aws/s3/S3ComponentConfigurationTest.java
    camel/trunk/components/camel-aws/src/test/java/org/apache/camel/component/aws/s3/S3ComponentSpringTest.java
    camel/trunk/components/camel-aws/src/test/java/org/apache/camel/component/aws/s3/S3ComponentTest.java
    camel/trunk/components/camel-aws/src/test/java/org/apache/camel/component/aws/s3/integration/
    camel/trunk/components/camel-aws/src/test/java/org/apache/camel/component/aws/s3/integration/S3ComponentIntegrationTest.java
    camel/trunk/components/camel-aws/src/test/resources/org/apache/camel/component/aws/s3/
    camel/trunk/components/camel-aws/src/test/resources/org/apache/camel/component/aws/s3/S3ComponentSpringTest-context.xml
    camel/trunk/tests/camel-itest-osgi/src/test/java/org/apache/camel/itest/osgi/aws/AmazonS3ClientMock.java
    camel/trunk/tests/camel-itest-osgi/src/test/java/org/apache/camel/itest/osgi/aws/AwsS3IntegrationTest.java
    camel/trunk/tests/camel-itest-osgi/src/test/java/org/apache/camel/itest/osgi/aws/AwsS3Test.java

Added: camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Component.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Component.java?rev=1087367&view=auto
==============================================================================
--- camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Component.java (added)
+++ camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Component.java Thu Mar 31 16:46:38 2011
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws.s3;
+
+import java.util.Map;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.Endpoint;
+import org.apache.camel.impl.DefaultComponent;
+
+/**
+ * Defines the <a href="http://aws.amazon.com/s3/">AWS S3 Component</a> 
+ */
+public class S3Component extends DefaultComponent {
+    
+    public S3Component() {
+        super();
+    }
+
+    public S3Component(CamelContext context) {
+        super(context);
+    }
+
+    protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception {
+        S3Configuration configuration = new S3Configuration();
+        setProperties(configuration, parameters);
+
+        if (remaining == null || remaining.trim().length() == 0) {
+            throw new IllegalArgumentException("Bucket name must be specified.");
+        }
+        configuration.setBucketName(remaining);
+
+        if (configuration.getAmazonS3Client() == null && (configuration.getAccessKey() == null || configuration.getSecretKey() == null)) {
+            throw new IllegalArgumentException("AmazonS3Client or accessKey and secretKey must be specified");
+        }
+
+        S3Endpoint endpoint = new S3Endpoint(uri, getCamelContext(), configuration);
+        return endpoint;
+    }
+}
\ No newline at end of file

Added: camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Configuration.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Configuration.java?rev=1087367&view=auto
==============================================================================
--- camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Configuration.java (added)
+++ camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Configuration.java Thu Mar 31 16:46:38 2011
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws.s3;
+
+import com.amazonaws.services.s3.AmazonS3Client;
+
+/**
+ * The AWS S3 component configuration properties
+ * 
+ */
+public class S3Configuration implements Cloneable {
+
+    private String accessKey;
+    private String secretKey;
+    private AmazonS3Client amazonS3Client;
+    
+    private String bucketName;
+    private String region;
+    private boolean deleteAfterRead = true;
+    
+    public String getAccessKey() {
+        return accessKey;
+    }
+
+    public void setAccessKey(String accessKey) {
+        this.accessKey = accessKey;
+    }
+
+    public String getSecretKey() {
+        return secretKey;
+    }
+
+    public void setSecretKey(String secretKey) {
+        this.secretKey = secretKey;
+    }
+    
+    public AmazonS3Client getAmazonS3Client() {
+        return amazonS3Client;
+    }
+
+    public void setAmazonS3Client(AmazonS3Client amazonS3Client) {
+        this.amazonS3Client = amazonS3Client;
+    }
+
+    public String getBucketName() {
+        return bucketName;
+    }
+
+    public void setBucketName(String bucketName) {
+        this.bucketName = bucketName;
+    }
+
+    public String getRegion() {
+        return region;
+    }
+
+    public void setRegion(String region) {
+        this.region = region;
+    }
+
+    public boolean isDeleteAfterRead() {
+        return deleteAfterRead;
+    }
+
+    public void setDeleteAfterRead(boolean deleteAfterRead) {
+        this.deleteAfterRead = deleteAfterRead;
+    }
+}
\ No newline at end of file

Added: camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Constants.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Constants.java?rev=1087367&view=auto
==============================================================================
--- camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Constants.java (added)
+++ camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Constants.java Thu Mar 31 16:46:38 2011
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws.s3;
+
+/**
+ * Constants used in Camel AWS S3 module
+ *
+ */
+public interface S3Constants {
+
+    String BUCKET_NAME = "CamelAwsS3BucketName";
+    String CACHE_CONTROL = "CamelAwsS3ContentControl";
+    String CONTENT_DISPOSITION = "CamelAwsS3ContentDisposition";
+    String CONTENT_ENCODING = "CamelAwsS3ContentEncoding";
+    String CONTENT_LENGTH = "CamelAwsS3ContentLength";
+    String CONTENT_MD5 = "CamelAwsS3ContentMD5";
+    String CONTENT_TYPE = "CamelAwsS3ContentType";
+    String E_TAG = "CamelAwsS3ETag";
+    String KEY = "CamelAwsS3Key";
+    String LAST_MODIFIED = "CamelAwsS3LastModified";
+    String VERSION_ID = "CamelAwsS3VersionId";
+}
\ No newline at end of file

Added: camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java?rev=1087367&view=auto
==============================================================================
--- camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java (added)
+++ camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java Thu Mar 31 16:46:38 2011
@@ -0,0 +1,230 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws.s3;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Queue;
+
+import com.amazonaws.AmazonClientException;
+import com.amazonaws.services.s3.AmazonS3Client;
+import com.amazonaws.services.s3.model.ListObjectsRequest;
+import com.amazonaws.services.s3.model.ObjectListing;
+import com.amazonaws.services.s3.model.S3Object;
+import com.amazonaws.services.s3.model.S3ObjectSummary;
+
+import org.apache.camel.BatchConsumer;
+import org.apache.camel.Exchange;
+import org.apache.camel.NoFactoryAvailableException;
+import org.apache.camel.Processor;
+import org.apache.camel.ShutdownRunningTask;
+import org.apache.camel.impl.DefaultEndpoint;
+import org.apache.camel.impl.ScheduledPollConsumer;
+import org.apache.camel.spi.ShutdownAware;
+import org.apache.camel.spi.Synchronization;
+import org.apache.camel.util.CastUtils;
+import org.apache.camel.util.ObjectHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A Consumer of messages from the Amazon Web Service Simple Storage Service
+ * <a href="http://aws.amazon.com/s3/">AWS S3</a>
+ * 
+ */
+public class S3Consumer extends ScheduledPollConsumer implements BatchConsumer, ShutdownAware {
+    
+    private static final transient Logger LOG = LoggerFactory.getLogger(S3Consumer.class);
+    
+    private volatile ShutdownRunningTask shutdownRunningTask;
+    private volatile int pendingExchanges;
+
+    public S3Consumer(S3Endpoint endpoint, Processor processor) throws NoFactoryAvailableException {
+        super(endpoint, processor);
+    }
+
+    @Override
+    protected int poll() throws Exception {
+        // must reset for each poll
+        shutdownRunningTask = null;
+        pendingExchanges = 0;
+        
+        String bucketName = getConfiguration().getBucketName();
+        LOG.trace("Quering objects in bucket [{}]...", bucketName);
+        
+        ListObjectsRequest listObjectsRequest = new ListObjectsRequest();
+        listObjectsRequest.setBucketName(bucketName);
+        listObjectsRequest.setMaxKeys(getMaxMessagesPerPoll());
+        
+        ObjectListing listObjects = getAmazonS3Client().listObjects(listObjectsRequest);
+        
+        LOG.trace("Found {} objects in bucket [{}]...", listObjects.getObjectSummaries().size(), bucketName);
+        
+        Queue<Exchange> exchanges = createExchanges(listObjects.getObjectSummaries());
+        return processBatch(CastUtils.cast(exchanges));
+    }
+    
+    protected Queue<Exchange> createExchanges(List<S3ObjectSummary> s3ObjectSummaries) {
+        LOG.trace("Received {} messages in this poll", s3ObjectSummaries.size());
+        
+        Queue<Exchange> answer = new LinkedList<Exchange>();
+        for (S3ObjectSummary s3ObjectSummary : s3ObjectSummaries) {
+            S3Object s3Object = getAmazonS3Client().getObject(s3ObjectSummary.getBucketName(), s3ObjectSummary.getKey());
+            Exchange exchange = getEndpoint().createExchange(s3Object);
+            answer.add(exchange);
+        }
+
+        return answer;
+    }
+    
+    public int processBatch(Queue<Object> exchanges) throws Exception {
+        int total = exchanges.size();
+
+        for (int index = 0; index < total && isBatchAllowed(); index++) {
+            // only loop if we are started (allowed to run)
+            Exchange exchange = ObjectHelper.cast(Exchange.class, exchanges.poll());
+            // add current index and total as properties
+            exchange.setProperty(Exchange.BATCH_INDEX, index);
+            exchange.setProperty(Exchange.BATCH_SIZE, total);
+            exchange.setProperty(Exchange.BATCH_COMPLETE, index == total - 1);
+
+            // update pending number of exchanges
+            pendingExchanges = total - index - 1;
+
+            // add on completion to handle after work when the exchange is done
+            exchange.addOnCompletion(new Synchronization() {
+                public void onComplete(Exchange exchange) {
+                    processCommit(exchange);
+                }
+
+                public void onFailure(Exchange exchange) {
+                    processRollback(exchange);
+                }
+
+                @Override
+                public String toString() {
+                    return "S3ConsumerOnCompletion";
+                }
+            });
+
+            LOG.trace("Processing exchange [{}]...", exchange);
+
+            getProcessor().process(exchange);
+        }
+
+        return total;
+    }
+    
+    /**
+     * Strategy to delete the message after being processed.
+     *
+     * @param exchange the exchange
+     */
+    protected void processCommit(Exchange exchange) {
+        try {
+            if (getConfiguration().isDeleteAfterRead()) {
+                String bucketName = exchange.getIn().getHeader(S3Constants.BUCKET_NAME, String.class);
+                String key = exchange.getIn().getHeader(S3Constants.KEY, String.class);
+                
+                LOG.trace("Deleting object from bucket {} with key {}...", bucketName, key);
+                
+                getAmazonS3Client().deleteObject(bucketName, key);
+                
+                LOG.trace("Object deleted");
+            }
+        } catch (AmazonClientException e) {
+            LOG.warn("Error occurred during deleting object", e);
+            exchange.setException(e);
+        }
+    }
+
+    /**
+     * Strategy when processing the exchange failed.
+     *
+     * @param exchange the exchange
+     */
+    protected void processRollback(Exchange exchange) {
+        Exception cause = exchange.getException();
+        if (cause != null) {
+            LOG.warn("Exchange failed, so rolling back message status: " + exchange, cause);
+        } else {
+            LOG.warn("Exchange failed, so rolling back message status: {}", exchange);
+        }
+    }
+    
+    public boolean isBatchAllowed() {
+        // stop if we are not running
+        boolean answer = isRunAllowed();
+        if (!answer) {
+            return false;
+        }
+
+        if (shutdownRunningTask == null) {
+            // we are not shutting down so continue to run
+            return true;
+        }
+
+        // we are shutting down so only continue if we are configured to complete all tasks
+        return ShutdownRunningTask.CompleteAllTasks == shutdownRunningTask;
+    }
+
+    public boolean deferShutdown(ShutdownRunningTask shutdownRunningTask) {
+        // store a reference what to do in case when shutting down and we have pending messages
+        this.shutdownRunningTask = shutdownRunningTask;
+        // do not defer shutdown
+        return false;
+    }
+
+    public int getPendingExchangesSize() {
+        // only return the real pending size in case we are configured to complete all tasks
+        if (ShutdownRunningTask.CompleteAllTasks == shutdownRunningTask) {
+            return pendingExchanges;
+        } else {
+            return 0;
+        }
+    }
+
+    public void prepareShutdown() {
+     // noop
+    }
+    
+    protected S3Configuration getConfiguration() {
+        return getEndpoint().getConfiguration();
+    }
+    
+    protected AmazonS3Client getAmazonS3Client() {
+        return getEndpoint().getS3Client();
+    }
+    
+    @Override
+    public S3Endpoint getEndpoint() {
+        return (S3Endpoint) super.getEndpoint();
+    }
+    
+    public void setMaxMessagesPerPoll(int maxMessagesPerPoll) {
+        getEndpoint().setMaxMessagesPerPoll(maxMessagesPerPoll);
+    }
+    
+    public int getMaxMessagesPerPoll() {
+        return getEndpoint().getMaxMessagesPerPoll();
+    }
+    
+    @Override
+    public String toString() {
+        return "S3Consumer[" + DefaultEndpoint.sanitizeUri(getEndpoint().getEndpointUri()) + "]";
+    }
+}
\ No newline at end of file

Added: camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Endpoint.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Endpoint.java?rev=1087367&view=auto
==============================================================================
--- camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Endpoint.java (added)
+++ camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Endpoint.java Thu Mar 31 16:46:38 2011
@@ -0,0 +1,170 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws.s3;
+
+import java.util.List;
+
+import com.amazonaws.auth.AWSCredentials;
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.services.s3.AmazonS3Client;
+import com.amazonaws.services.s3.model.Bucket;
+import com.amazonaws.services.s3.model.CreateBucketRequest;
+import com.amazonaws.services.s3.model.ObjectMetadata;
+import com.amazonaws.services.s3.model.S3Object;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.Consumer;
+import org.apache.camel.Exchange;
+import org.apache.camel.ExchangePattern;
+import org.apache.camel.Message;
+import org.apache.camel.Processor;
+import org.apache.camel.Producer;
+import org.apache.camel.impl.DefaultExchange;
+import org.apache.camel.impl.ScheduledPollEndpoint;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Defines the <a href="http://camel.apache.org/aws.html">AWS S3 Endpoint</a>.  
+ *
+ */
+public class S3Endpoint extends ScheduledPollEndpoint {
+
+    private static final Logger LOG = LoggerFactory.getLogger(S3Endpoint.class);
+
+    private AmazonS3Client s3Client;
+    private S3Configuration configuration;
+    private int maxMessagesPerPoll;
+    
+    public S3Endpoint(String uri, CamelContext context, S3Configuration configuration) {
+        super(uri, context);
+        this.configuration = configuration;
+    }
+
+    public Consumer createConsumer(Processor processor) throws Exception {
+        S3Consumer s3Consumer = new S3Consumer(this, processor);
+        configureConsumer(s3Consumer);
+        return s3Consumer;
+    }
+
+    public Producer createProducer() throws Exception {
+        return new S3Producer(this);
+    }
+
+    public boolean isSingleton() {
+        return true;
+    }
+
+    @Override
+    public void doStart() throws Exception {
+        super.doStart();
+        
+        String bucketName = getConfiguration().getBucketName();
+        LOG.trace("Quering whether bucket [{}] already exists...", bucketName);
+        
+        List<Bucket> buckets = getS3Client().listBuckets();
+        for (Bucket bucket : buckets) {
+            if (bucketName.equals(bucket.getName())) {
+                LOG.trace("Bucket [{}] already exist", bucketName);
+                return;
+            }
+        }
+        
+        LOG.trace("Bucket [{}] doesn't exist yet", bucketName);
+        
+        // creates the new bucket because it doesn't exist yet
+        CreateBucketRequest createBucketRequest = new CreateBucketRequest(getConfiguration().getBucketName());
+        if (getConfiguration().getRegion() != null) {
+            createBucketRequest.setRegion(getConfiguration().getRegion());
+        }
+        
+        LOG.trace("Creating bucket [{}] in region [{}] with request [{}]...", new Object[]{configuration.getBucketName(), configuration.getRegion(), createBucketRequest});
+        
+        getS3Client().createBucket(createBucketRequest);
+        
+        LOG.trace("Bucket created");
+    }
+
+    public Exchange createExchange(S3Object s3Object) {
+        return createExchange(getExchangePattern(), s3Object);
+    }
+
+    public Exchange createExchange(ExchangePattern pattern, S3Object s3Object) {
+        LOG.trace("Getting object with key [{}] from bucket [{}]...", s3Object.getKey(), s3Object.getBucketName());
+        
+        ObjectMetadata objectMetadata = s3Object.getObjectMetadata();
+        
+        LOG.trace("Got object [{}]", s3Object);
+        
+        Exchange exchange = new DefaultExchange(this, pattern);
+        Message message = exchange.getIn();
+        message.setBody(s3Object.getObjectContent());
+        message.setHeader(S3Constants.KEY, s3Object.getKey());
+        message.setHeader(S3Constants.BUCKET_NAME, s3Object.getBucketName());
+        message.setHeader(S3Constants.E_TAG, objectMetadata.getETag());
+        message.setHeader(S3Constants.LAST_MODIFIED, objectMetadata.getLastModified());
+        message.setHeader(S3Constants.VERSION_ID, objectMetadata.getVersionId());
+        message.setHeader(S3Constants.LAST_MODIFIED, objectMetadata.getLastModified());
+        message.setHeader(S3Constants.CONTENT_TYPE, objectMetadata.getContentType());
+        message.setHeader(S3Constants.CONTENT_MD5, objectMetadata.getContentMD5());
+        message.setHeader(S3Constants.CONTENT_LENGTH, objectMetadata.getContentLength());
+        message.setHeader(S3Constants.CONTENT_ENCODING, objectMetadata.getContentEncoding());
+        message.setHeader(S3Constants.CONTENT_DISPOSITION, objectMetadata.getContentDisposition());
+        message.setHeader(S3Constants.CACHE_CONTROL, objectMetadata.getCacheControl());
+        
+        return exchange;
+    }
+
+    public S3Configuration getConfiguration() {
+        return configuration;
+    }
+
+    public void setConfiguration(S3Configuration configuration) {
+        this.configuration = configuration;
+    }
+    
+    public void setS3Client(AmazonS3Client s3Client) {
+        this.s3Client = s3Client;
+    }
+    
+    public AmazonS3Client getS3Client() {
+        if (s3Client == null) {
+            s3Client = configuration.getAmazonS3Client() != null
+                ? configuration.getAmazonS3Client() : createS3Client();
+        }
+        
+        return s3Client;
+    }
+
+    /**
+     * Provide the possibility to override this method for an mock implementation
+     *
+     * @return AmazonS3Client
+     */
+    AmazonS3Client createS3Client() {
+        AWSCredentials credentials = new BasicAWSCredentials(configuration.getAccessKey(), configuration.getSecretKey());
+        return new AmazonS3Client(credentials);
+    }
+    
+    public int getMaxMessagesPerPoll() {
+        return maxMessagesPerPoll;
+    }
+
+    public void setMaxMessagesPerPoll(int maxMessagesPerPoll) {
+        this.maxMessagesPerPoll = maxMessagesPerPoll;
+    }
+}
\ No newline at end of file

Added: camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Producer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Producer.java?rev=1087367&view=auto
==============================================================================
--- camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Producer.java (added)
+++ camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Producer.java Thu Mar 31 16:46:38 2011
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws.s3;
+
+import java.io.InputStream;
+
+import com.amazonaws.services.s3.model.ObjectMetadata;
+import com.amazonaws.services.s3.model.PutObjectRequest;
+import com.amazonaws.services.s3.model.PutObjectResult;
+
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+import org.apache.camel.impl.DefaultEndpoint;
+import org.apache.camel.impl.DefaultProducer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A Producer which sends messages to the Amazon Web Service Simple Storage Service
+ * <a href="http://aws.amazon.com/s3/">AWS S3</a>
+ */
+public class S3Producer extends DefaultProducer {
+
+    private static final Logger LOG = LoggerFactory.getLogger(S3Producer.class);
+
+    public S3Producer(Endpoint endpoint) {
+        super(endpoint);
+    }
+
+    public void process(Exchange exchange) throws Exception {
+        ObjectMetadata objectMetadata = new ObjectMetadata();
+        
+        PutObjectRequest putObjectRequest = new PutObjectRequest(
+                getConfiguration().getBucketName(),
+                determineKey(exchange),
+                exchange.getIn().getMandatoryBody(InputStream.class),
+                objectMetadata);
+        
+        LOG.trace("Put object [{}] from exchange [{}]...", putObjectRequest, exchange);
+        
+        PutObjectResult putObjectResult = getEndpoint().getS3Client().putObject(putObjectRequest);
+
+        LOG.trace("Received result [{}]", putObjectResult);
+        
+        Message message = getMessageForResponse(exchange);
+        message.setHeader(S3Constants.E_TAG, putObjectResult.getETag());
+        if (putObjectResult.getVersionId() != null) {
+            message.setHeader(S3Constants.VERSION_ID, putObjectResult.getVersionId());            
+        }
+    }
+    
+    private String determineKey(Exchange exchange) {
+        String key = exchange.getIn().getHeader(S3Constants.KEY, String.class);
+        if (key == null) {
+            throw new IllegalArgumentException("AWS S3 Key header missing.");
+        }
+        return key;
+    }
+
+    private Message getMessageForResponse(Exchange exchange) {
+        if (exchange.getPattern().isOutCapable()) {
+            Message out = exchange.getOut();
+            out.copyFrom(exchange.getIn());
+            return out;
+        }
+        
+        return exchange.getIn();
+    }
+    
+    protected S3Configuration getConfiguration() {
+        return getEndpoint().getConfiguration();
+    }
+    
+    @Override
+    public String toString() {
+        return "S3Producer[" + DefaultEndpoint.sanitizeUri(getEndpoint().getEndpointUri()) + "]";
+    }
+    
+    @Override
+    public S3Endpoint getEndpoint() {
+        return (S3Endpoint) super.getEndpoint();
+    }
+}
\ No newline at end of file

Added: camel/trunk/components/camel-aws/src/main/resources/META-INF/services/org/apache/camel/component/aws-s3
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-aws/src/main/resources/META-INF/services/org/apache/camel/component/aws-s3?rev=1087367&view=auto
==============================================================================
--- camel/trunk/components/camel-aws/src/main/resources/META-INF/services/org/apache/camel/component/aws-s3 (added)
+++ camel/trunk/components/camel-aws/src/main/resources/META-INF/services/org/apache/camel/component/aws-s3 Thu Mar 31 16:46:38 2011
@@ -0,0 +1,18 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements.  See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License.  You may obtain a copy of the License at
+##
+## http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+
+class=org.apache.camel.component.aws.s3.S3Component

Added: camel/trunk/components/camel-aws/src/test/java/org/apache/camel/component/aws/s3/AmazonS3ClientMock.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-aws/src/test/java/org/apache/camel/component/aws/s3/AmazonS3ClientMock.java?rev=1087367&view=auto
==============================================================================
--- camel/trunk/components/camel-aws/src/test/java/org/apache/camel/component/aws/s3/AmazonS3ClientMock.java (added)
+++ camel/trunk/components/camel-aws/src/test/java/org/apache/camel/component/aws/s3/AmazonS3ClientMock.java Thu Mar 31 16:46:38 2011
@@ -0,0 +1,434 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws.s3;
+
+import java.io.File;
+import java.io.InputStream;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+
+import com.amazonaws.AmazonClientException;
+import com.amazonaws.AmazonServiceException;
+import com.amazonaws.AmazonWebServiceRequest;
+import com.amazonaws.HttpMethod;
+import com.amazonaws.Request;
+import com.amazonaws.http.HttpMethodName;
+import com.amazonaws.services.s3.AmazonS3Client;
+import com.amazonaws.services.s3.S3ResponseMetadata;
+import com.amazonaws.services.s3.model.AbortMultipartUploadRequest;
+import com.amazonaws.services.s3.model.AccessControlList;
+import com.amazonaws.services.s3.model.Bucket;
+import com.amazonaws.services.s3.model.BucketLoggingConfiguration;
+import com.amazonaws.services.s3.model.BucketNotificationConfiguration;
+import com.amazonaws.services.s3.model.BucketPolicy;
+import com.amazonaws.services.s3.model.BucketVersioningConfiguration;
+import com.amazonaws.services.s3.model.CannedAccessControlList;
+import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest;
+import com.amazonaws.services.s3.model.CompleteMultipartUploadResult;
+import com.amazonaws.services.s3.model.CopyObjectRequest;
+import com.amazonaws.services.s3.model.CopyObjectResult;
+import com.amazonaws.services.s3.model.CreateBucketRequest;
+import com.amazonaws.services.s3.model.DeleteBucketRequest;
+import com.amazonaws.services.s3.model.DeleteObjectRequest;
+import com.amazonaws.services.s3.model.DeleteVersionRequest;
+import com.amazonaws.services.s3.model.GeneratePresignedUrlRequest;
+import com.amazonaws.services.s3.model.GetObjectMetadataRequest;
+import com.amazonaws.services.s3.model.GetObjectRequest;
+import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest;
+import com.amazonaws.services.s3.model.InitiateMultipartUploadResult;
+import com.amazonaws.services.s3.model.ListBucketsRequest;
+import com.amazonaws.services.s3.model.ListMultipartUploadsRequest;
+import com.amazonaws.services.s3.model.ListObjectsRequest;
+import com.amazonaws.services.s3.model.ListPartsRequest;
+import com.amazonaws.services.s3.model.ListVersionsRequest;
+import com.amazonaws.services.s3.model.MultipartUploadListing;
+import com.amazonaws.services.s3.model.ObjectListing;
+import com.amazonaws.services.s3.model.ObjectMetadata;
+import com.amazonaws.services.s3.model.Owner;
+import com.amazonaws.services.s3.model.PartListing;
+import com.amazonaws.services.s3.model.PutObjectRequest;
+import com.amazonaws.services.s3.model.PutObjectResult;
+import com.amazonaws.services.s3.model.Region;
+import com.amazonaws.services.s3.model.S3Object;
+import com.amazonaws.services.s3.model.S3ObjectSummary;
+import com.amazonaws.services.s3.model.SetBucketLoggingConfigurationRequest;
+import com.amazonaws.services.s3.model.SetBucketVersioningConfigurationRequest;
+import com.amazonaws.services.s3.model.StorageClass;
+import com.amazonaws.services.s3.model.UploadPartRequest;
+import com.amazonaws.services.s3.model.UploadPartResult;
+import com.amazonaws.services.s3.model.VersionListing;
+
+public class AmazonS3ClientMock extends AmazonS3Client {
+    
+    List<S3Object> objects = new ArrayList<S3Object>();    
+    
+    public AmazonS3ClientMock() {
+        super(null);
+    }
+
+    @Override
+    public VersionListing listNextBatchOfVersions(VersionListing previousVersionListing) throws AmazonClientException, AmazonServiceException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public VersionListing listVersions(String bucketName, String prefix) throws AmazonClientException, AmazonServiceException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public VersionListing listVersions(String bucketName, String prefix, String keyMarker, String versionIdMarker, String delimiter, Integer maxKeys) 
+        throws AmazonClientException, AmazonServiceException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public VersionListing listVersions(ListVersionsRequest listVersionsRequest) throws AmazonClientException, AmazonServiceException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public ObjectListing listObjects(String bucketName) throws AmazonClientException, AmazonServiceException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public ObjectListing listObjects(String bucketName, String prefix) throws AmazonClientException, AmazonServiceException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public ObjectListing listObjects(ListObjectsRequest listObjectsRequest) throws AmazonClientException, AmazonServiceException {
+        ObjectListing objectListing = new ObjectListing();
+        int capacity = listObjectsRequest.getMaxKeys();
+        
+        for (int index = 0; index < objects.size() && index < capacity; index++) {
+            S3ObjectSummary s3ObjectSummary = new S3ObjectSummary();
+            s3ObjectSummary.setBucketName(objects.get(index).getBucketName());
+            s3ObjectSummary.setKey(objects.get(index).getKey());
+            
+            objectListing.getObjectSummaries().add(s3ObjectSummary);
+        }
+
+        return objectListing;
+    }
+
+    @Override
+    public ObjectListing listNextBatchOfObjects(ObjectListing previousObjectListing) throws AmazonClientException, AmazonServiceException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public Owner getS3AccountOwner() throws AmazonClientException, AmazonServiceException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public List<Bucket> listBuckets(ListBucketsRequest listBucketsRequest) throws AmazonClientException, AmazonServiceException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public List<Bucket> listBuckets() throws AmazonClientException, AmazonServiceException {
+        return new ArrayList<Bucket>();
+    }
+
+    @Override
+    public String getBucketLocation(String bucketName) throws AmazonClientException, AmazonServiceException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public Bucket createBucket(String bucketName) throws AmazonClientException, AmazonServiceException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public Bucket createBucket(String bucketName, Region region) throws AmazonClientException, AmazonServiceException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public Bucket createBucket(String bucketName, String region) throws AmazonClientException, AmazonServiceException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public Bucket createBucket(CreateBucketRequest createBucketRequest) throws AmazonClientException, AmazonServiceException {
+        Bucket bucket = new Bucket();
+        bucket.setName(createBucketRequest.getBucketName());
+        bucket.setCreationDate(new Date());
+        bucket.setOwner(new Owner("c2efc7302b9011ba9a78a92ac5fd1cd47b61790499ab5ddf5a37c31f0638a8fc ", "Christian Mueller"));
+        return bucket;
+    }
+
+    @Override
+    public AccessControlList getObjectAcl(String bucketName, String key) throws AmazonClientException, AmazonServiceException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public AccessControlList getObjectAcl(String bucketName, String key, String versionId) throws AmazonClientException, AmazonServiceException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void setObjectAcl(String bucketName, String key, AccessControlList acl) throws AmazonClientException, AmazonServiceException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void setObjectAcl(String bucketName, String key, CannedAccessControlList acl) throws AmazonClientException, AmazonServiceException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void setObjectAcl(String bucketName, String key, String versionId, AccessControlList acl) throws AmazonClientException, AmazonServiceException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void setObjectAcl(String bucketName, String key, String versionId, CannedAccessControlList acl) throws AmazonClientException, AmazonServiceException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public AccessControlList getBucketAcl(String bucketName) throws AmazonClientException, AmazonServiceException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void setBucketAcl(String bucketName, AccessControlList acl) throws AmazonClientException, AmazonServiceException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void setBucketAcl(String bucketName, CannedAccessControlList acl) throws AmazonClientException, AmazonServiceException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public ObjectMetadata getObjectMetadata(String bucketName, String key) throws AmazonClientException, AmazonServiceException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public ObjectMetadata getObjectMetadata(GetObjectMetadataRequest getObjectMetadataRequest) throws AmazonClientException, AmazonServiceException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public S3Object getObject(String bucketName, String key) throws AmazonClientException, AmazonServiceException {
+        for (S3Object s3Object : objects) {
+            if (bucketName.equals(s3Object.getBucketName()) && key.equals(s3Object.getKey())) {
+                return s3Object;
+            }
+        }
+        
+        return null;
+    }
+
+    @Override
+    public boolean doesBucketExist(String bucketName) throws AmazonClientException, AmazonServiceException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void changeObjectStorageClass(String bucketName, String key, StorageClass newStorageClass) throws AmazonClientException, AmazonServiceException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public S3Object getObject(GetObjectRequest getObjectRequest) throws AmazonClientException, AmazonServiceException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public ObjectMetadata getObject(GetObjectRequest getObjectRequest, File destinationFile) throws AmazonClientException, AmazonServiceException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void deleteBucket(String bucketName) throws AmazonClientException, AmazonServiceException {
+        // noop
+    }
+
+    @Override
+    public void deleteBucket(DeleteBucketRequest deleteBucketRequest) throws AmazonClientException, AmazonServiceException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public PutObjectResult putObject(String bucketName, String key, File file) throws AmazonClientException, AmazonServiceException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public PutObjectResult putObject(String bucketName, String key, InputStream input, ObjectMetadata metadata) throws AmazonClientException, AmazonServiceException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public PutObjectResult putObject(PutObjectRequest putObjectRequest) throws AmazonClientException, AmazonServiceException {
+        S3Object s3Object = new S3Object();
+        s3Object.setBucketName(putObjectRequest.getBucketName());
+        s3Object.setKey(putObjectRequest.getKey());
+        s3Object.setObjectContent(putObjectRequest.getInputStream());
+        objects.add(s3Object);
+        
+        PutObjectResult putObjectResult = new PutObjectResult();
+        putObjectResult.setETag("3a5c8b1ad448bca04584ecb55b836264");
+        return putObjectResult;
+    }
+
+    @Override
+    public CopyObjectResult copyObject(String sourceBucketName, String sourceKey, String destinationBucketName, String destinationKey) throws AmazonClientException, AmazonServiceException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public CopyObjectResult copyObject(CopyObjectRequest copyObjectRequest) throws AmazonClientException, AmazonServiceException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void deleteObject(String bucketName, String key) throws AmazonClientException, AmazonServiceException {
+        //noop
+    }
+
+    @Override
+    public void deleteObject(DeleteObjectRequest deleteObjectRequest) throws AmazonClientException, AmazonServiceException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void deleteVersion(String bucketName, String key, String versionId) throws AmazonClientException, AmazonServiceException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void deleteVersion(DeleteVersionRequest deleteVersionRequest) throws AmazonClientException, AmazonServiceException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void setBucketVersioningConfiguration(SetBucketVersioningConfigurationRequest setBucketVersioningConfigurationRequest) throws AmazonClientException, AmazonServiceException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public BucketVersioningConfiguration getBucketVersioningConfiguration(String bucketName) throws AmazonClientException, AmazonServiceException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void setBucketNotificationConfiguration(String bucketName, BucketNotificationConfiguration bucketNotificationConfiguration) throws AmazonClientException, AmazonServiceException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public BucketNotificationConfiguration getBucketNotificationConfiguration(String bucketName) throws AmazonClientException, AmazonServiceException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public BucketLoggingConfiguration getBucketLoggingConfiguration(String bucketName) throws AmazonClientException, AmazonServiceException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void setBucketLoggingConfiguration(SetBucketLoggingConfigurationRequest setBucketLoggingConfigurationRequest) throws AmazonClientException, AmazonServiceException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public BucketPolicy getBucketPolicy(String bucketName) throws AmazonClientException, AmazonServiceException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void setBucketPolicy(String bucketName, String policyText) throws AmazonClientException, AmazonServiceException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void deleteBucketPolicy(String bucketName) throws AmazonClientException, AmazonServiceException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public URL generatePresignedUrl(String bucketName, String key, Date expiration) throws AmazonClientException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public URL generatePresignedUrl(String bucketName, String key, Date expiration, HttpMethod method) throws AmazonClientException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public URL generatePresignedUrl(GeneratePresignedUrlRequest generatePresignedUrlRequest) throws AmazonClientException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void abortMultipartUpload(AbortMultipartUploadRequest abortMultipartUploadRequest) throws AmazonClientException, AmazonServiceException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public CompleteMultipartUploadResult completeMultipartUpload(CompleteMultipartUploadRequest completeMultipartUploadRequest) throws AmazonClientException, AmazonServiceException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public InitiateMultipartUploadResult initiateMultipartUpload(InitiateMultipartUploadRequest initiateMultipartUploadRequest) throws AmazonClientException, AmazonServiceException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public MultipartUploadListing listMultipartUploads(ListMultipartUploadsRequest listMultipartUploadsRequest) throws AmazonClientException, AmazonServiceException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public PartListing listParts(ListPartsRequest listPartsRequest) throws AmazonClientException, AmazonServiceException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public UploadPartResult uploadPart(UploadPartRequest uploadPartRequest) throws AmazonClientException, AmazonServiceException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public S3ResponseMetadata getCachedResponseMetadata(AmazonWebServiceRequest request) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    protected Request<Void> createRequest(String bucketName, String key, AmazonWebServiceRequest originalRequest) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    protected <T> void signRequest(Request<T> request, HttpMethodName methodName, String bucketName, String key) {
+        throw new UnsupportedOperationException();
+    }
+}
\ No newline at end of file

Added: camel/trunk/components/camel-aws/src/test/java/org/apache/camel/component/aws/s3/S3BatchConsumerTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-aws/src/test/java/org/apache/camel/component/aws/s3/S3BatchConsumerTest.java?rev=1087367&view=auto
==============================================================================
--- camel/trunk/components/camel-aws/src/test/java/org/apache/camel/component/aws/s3/S3BatchConsumerTest.java (added)
+++ camel/trunk/components/camel-aws/src/test/java/org/apache/camel/component/aws/s3/S3BatchConsumerTest.java Thu Mar 31 16:46:38 2011
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws.s3;
+
+import com.amazonaws.services.s3.model.S3Object;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.impl.JndiRegistry;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+public class S3BatchConsumerTest extends CamelTestSupport {
+    
+    @EndpointInject(uri = "mock:result")
+    private MockEndpoint mock;
+        
+    @Test
+    public void receiveBatch() throws Exception {
+        mock.expectedMessageCount(5);
+        assertMockEndpointsSatisfied();
+        
+        mock.message(0).property(Exchange.BATCH_INDEX).isEqualTo(0);
+        mock.message(1).property(Exchange.BATCH_INDEX).isEqualTo(1);
+        mock.message(2).property(Exchange.BATCH_INDEX).isEqualTo(2);
+        mock.message(3).property(Exchange.BATCH_INDEX).isEqualTo(3);
+        mock.message(4).property(Exchange.BATCH_INDEX).isEqualTo(4);
+        mock.message(0).property(Exchange.BATCH_COMPLETE).isEqualTo(false);
+        mock.message(1).property(Exchange.BATCH_COMPLETE).isEqualTo(false);
+        mock.message(2).property(Exchange.BATCH_COMPLETE).isEqualTo(false);
+        mock.message(3).property(Exchange.BATCH_COMPLETE).isEqualTo(false);
+        mock.message(3).property(Exchange.BATCH_COMPLETE).isEqualTo(false);
+        mock.message(4).property(Exchange.BATCH_COMPLETE).isEqualTo(true);
+        mock.expectedPropertyReceived(Exchange.BATCH_SIZE, 5);
+    }
+    
+    @Override
+    protected JndiRegistry createRegistry() throws Exception {
+        JndiRegistry registry = super.createRegistry();
+        
+        AmazonS3ClientMock clientMock = new AmazonS3ClientMock();
+        // add 6 messages, one more we will poll
+        for (int counter = 0; counter < 6; counter++) {
+            S3Object s3Object = new S3Object();
+            s3Object.setBucketName("mycamelbucket");
+            s3Object.setKey("counter-" + counter);
+            
+            clientMock.objects.add(s3Object);
+        }
+        
+        registry.bind("amazonS3Client", clientMock);
+        
+        return registry;
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("aws-s3://mycamelbucket?amazonS3Client=#amazonS3Client&region=us-west-1&delay=5000&maxMessagesPerPoll=5")
+                    .to("mock:result");
+            }
+        };
+    }
+}
\ No newline at end of file

Added: camel/trunk/components/camel-aws/src/test/java/org/apache/camel/component/aws/s3/S3ComponentConfigurationTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-aws/src/test/java/org/apache/camel/component/aws/s3/S3ComponentConfigurationTest.java?rev=1087367&view=auto
==============================================================================
--- camel/trunk/components/camel-aws/src/test/java/org/apache/camel/component/aws/s3/S3ComponentConfigurationTest.java (added)
+++ camel/trunk/components/camel-aws/src/test/java/org/apache/camel/component/aws/s3/S3ComponentConfigurationTest.java Thu Mar 31 16:46:38 2011
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws.s3;
+
+import org.apache.camel.impl.JndiRegistry;
+import org.apache.camel.impl.PropertyPlaceholderDelegateRegistry;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+public class S3ComponentConfigurationTest extends CamelTestSupport {
+    
+    @Test
+    public void createEndpointWithMinimalConfiguration() throws Exception {
+        S3Component component = new S3Component(context);
+        S3Endpoint endpoint = (S3Endpoint) component.createEndpoint("aws-s3://MyBucket?accessKey=xxx&secretKey=yyy");
+        
+        assertEquals("MyBucket", endpoint.getConfiguration().getBucketName());
+        assertEquals("xxx", endpoint.getConfiguration().getAccessKey());
+        assertEquals("yyy", endpoint.getConfiguration().getSecretKey());
+        assertNull(endpoint.getConfiguration().getAmazonS3Client());
+        assertNull(endpoint.getConfiguration().getRegion());
+        assertTrue(endpoint.getConfiguration().isDeleteAfterRead());
+    }
+    
+    @Test
+    public void createEndpointWithMinimalConfigurationAndProvidedClient() throws Exception {
+        AmazonS3ClientMock mock = new AmazonS3ClientMock();
+        
+        ((JndiRegistry) ((PropertyPlaceholderDelegateRegistry) context.getRegistry()).getRegistry()).bind("amazonS3Client", mock);
+        
+        S3Component component = new S3Component(context);
+        S3Endpoint endpoint = (S3Endpoint) component.createEndpoint("aws-s3://MyBucket?amazonS3Client=#amazonS3Client");
+        
+        assertEquals("MyBucket", endpoint.getConfiguration().getBucketName());
+        assertNull(endpoint.getConfiguration().getAccessKey());
+        assertNull(endpoint.getConfiguration().getSecretKey());
+        assertSame(mock, endpoint.getConfiguration().getAmazonS3Client());
+        assertNull(endpoint.getConfiguration().getRegion());
+        assertTrue(endpoint.getConfiguration().isDeleteAfterRead());
+    }
+    
+    @Test
+    public void createEndpointWithMaximalConfiguration() throws Exception {
+        S3Component component = new S3Component(context);
+        S3Endpoint endpoint = (S3Endpoint) component.createEndpoint("aws-s3://MyBucket?accessKey=xxx&secretKey=yyy&region=us-west-1&deleteAfterRead=false");
+        
+        assertEquals("MyBucket", endpoint.getConfiguration().getBucketName());
+        assertEquals("xxx", endpoint.getConfiguration().getAccessKey());
+        assertEquals("yyy", endpoint.getConfiguration().getSecretKey());
+        assertNull(endpoint.getConfiguration().getAmazonS3Client());
+        assertEquals("us-west-1", endpoint.getConfiguration().getRegion());
+        assertFalse(endpoint.getConfiguration().isDeleteAfterRead());
+    }
+    
+    @Test(expected = IllegalArgumentException.class)
+    public void createEndpointWithoutBucketName() throws Exception {
+        S3Component component = new S3Component(context);
+        component.createEndpoint("aws-s3:// ");
+    }
+    
+    @Test(expected = IllegalArgumentException.class)
+    public void createEndpointWithoutAccessKeyConfiguration() throws Exception {
+        S3Component component = new S3Component(context);
+        component.createEndpoint("aws-sns://MyTopic?secretKey=yyy");
+    }
+    
+    @Test(expected = IllegalArgumentException.class)
+    public void createEndpointWithoutSecretKeyConfiguration() throws Exception {
+        S3Component component = new S3Component(context);
+        component.createEndpoint("aws-sns://MyTopic?accessKey=xxx");
+    }
+}
\ No newline at end of file

Added: camel/trunk/components/camel-aws/src/test/java/org/apache/camel/component/aws/s3/S3ComponentSpringTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-aws/src/test/java/org/apache/camel/component/aws/s3/S3ComponentSpringTest.java?rev=1087367&view=auto
==============================================================================
--- camel/trunk/components/camel-aws/src/test/java/org/apache/camel/component/aws/s3/S3ComponentSpringTest.java (added)
+++ camel/trunk/components/camel-aws/src/test/java/org/apache/camel/component/aws/s3/S3ComponentSpringTest.java Thu Mar 31 16:46:38 2011
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws.s3;
+
+import java.io.InputStream;
+
+import org.apache.camel.EndpointInject;
+import org.apache.camel.Exchange;
+import org.apache.camel.ExchangePattern;
+import org.apache.camel.Message;
+import org.apache.camel.Processor;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.test.junit4.CamelSpringTestSupport;
+import org.junit.Test;
+import org.springframework.context.support.ClassPathXmlApplicationContext;
+
+public class S3ComponentSpringTest extends CamelSpringTestSupport {
+    
+    @EndpointInject(uri = "direct:start")
+    private ProducerTemplate template;
+    
+    @EndpointInject(uri = "mock:result")
+    private MockEndpoint result;
+    
+    @Test
+    public void sendInOnly() throws Exception {
+        result.expectedMessageCount(1);
+        
+        Exchange exchange = template.send("direct:start", ExchangePattern.InOnly, new Processor() {
+            public void process(Exchange exchange) throws Exception {
+                exchange.getIn().setHeader(S3Constants.KEY, "CamelUnitTest");
+                exchange.getIn().setBody("This is my bucket content.");
+            }
+        });
+        
+        assertMockEndpointsSatisfied();
+        
+        assertResultExchange(result.getExchanges().get(0));
+        
+        assertResponseMessage(exchange.getIn());
+    }
+
+    @Test
+    public void sendInOut() throws Exception {
+        result.expectedMessageCount(1);
+        
+        Exchange exchange = template.send("direct:start", ExchangePattern.InOut, new Processor() {
+            public void process(Exchange exchange) throws Exception {
+                exchange.getIn().setHeader(S3Constants.KEY, "CamelUnitTest");
+                exchange.getIn().setBody("This is my bucket content.");
+            }
+        });
+        
+        assertMockEndpointsSatisfied();
+        
+        assertResultExchange(result.getExchanges().get(0));
+        
+        assertResponseMessage(exchange.getOut());
+    }
+    
+    private void assertResultExchange(Exchange resultExchange) {
+        assertIsInstanceOf(InputStream.class, resultExchange.getIn().getBody());
+        assertEquals("This is my bucket content.", resultExchange.getIn().getBody(String.class));
+        assertEquals("mycamelbucket", resultExchange.getIn().getHeader(S3Constants.BUCKET_NAME));
+        assertEquals("CamelUnitTest", resultExchange.getIn().getHeader(S3Constants.KEY));
+        assertNull(resultExchange.getIn().getHeader(S3Constants.VERSION_ID)); // not enabled on this bucket
+        assertNull(resultExchange.getIn().getHeader(S3Constants.LAST_MODIFIED));
+        assertNull(resultExchange.getIn().getHeader(S3Constants.E_TAG));
+        assertNull(resultExchange.getIn().getHeader(S3Constants.CONTENT_TYPE));
+        assertNull(resultExchange.getIn().getHeader(S3Constants.CONTENT_ENCODING));
+        assertEquals(0L, resultExchange.getIn().getHeader(S3Constants.CONTENT_LENGTH));
+        assertNull(resultExchange.getIn().getHeader(S3Constants.CONTENT_DISPOSITION));
+        assertNull(resultExchange.getIn().getHeader(S3Constants.CONTENT_MD5));
+        assertNull(resultExchange.getIn().getHeader(S3Constants.CACHE_CONTROL));
+    }
+    
+    private void assertResponseMessage(Message message) {
+        assertEquals("3a5c8b1ad448bca04584ecb55b836264", message.getHeader(S3Constants.E_TAG));
+        assertNull(message.getHeader(S3Constants.VERSION_ID));
+    }
+
+    @Override
+    protected ClassPathXmlApplicationContext createApplicationContext() {
+        return new ClassPathXmlApplicationContext("org/apache/camel/component/aws/s3/S3ComponentSpringTest-context.xml");
+    }
+}
\ No newline at end of file

Added: camel/trunk/components/camel-aws/src/test/java/org/apache/camel/component/aws/s3/S3ComponentTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-aws/src/test/java/org/apache/camel/component/aws/s3/S3ComponentTest.java?rev=1087367&view=auto
==============================================================================
--- camel/trunk/components/camel-aws/src/test/java/org/apache/camel/component/aws/s3/S3ComponentTest.java (added)
+++ camel/trunk/components/camel-aws/src/test/java/org/apache/camel/component/aws/s3/S3ComponentTest.java Thu Mar 31 16:46:38 2011
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws.s3;
+
+import java.io.InputStream;
+
+import org.apache.camel.EndpointInject;
+import org.apache.camel.Exchange;
+import org.apache.camel.ExchangePattern;
+import org.apache.camel.Message;
+import org.apache.camel.Processor;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.impl.JndiRegistry;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+public class S3ComponentTest extends CamelTestSupport {
+    
+    @EndpointInject(uri = "direct:start")
+    private ProducerTemplate template;
+    
+    @EndpointInject(uri = "mock:result")
+    private MockEndpoint result;
+    
+    @Test
+    public void sendInOnly() throws Exception {
+        result.expectedMessageCount(1);
+        
+        Exchange exchange = template.send("direct:start", ExchangePattern.InOnly, new Processor() {
+            public void process(Exchange exchange) throws Exception {
+                exchange.getIn().setHeader(S3Constants.KEY, "CamelUnitTest");
+                exchange.getIn().setBody("This is my bucket content.");
+            }
+        });
+        
+        assertMockEndpointsSatisfied();
+        
+        assertResultExchange(result.getExchanges().get(0));
+        
+        assertResponseMessage(exchange.getIn());
+    }
+
+    @Test
+    public void sendInOut() throws Exception {
+        result.expectedMessageCount(1);
+        
+        Exchange exchange = template.send("direct:start", ExchangePattern.InOut, new Processor() {
+            public void process(Exchange exchange) throws Exception {
+                exchange.getIn().setHeader(S3Constants.KEY, "CamelUnitTest");
+                exchange.getIn().setBody("This is my bucket content.");
+            }
+        });
+        
+        assertMockEndpointsSatisfied();
+        
+        assertResultExchange(result.getExchanges().get(0));
+        
+        assertResponseMessage(exchange.getOut());
+    }
+    
+    private void assertResultExchange(Exchange resultExchange) {
+        assertIsInstanceOf(InputStream.class, resultExchange.getIn().getBody());
+        assertEquals("This is my bucket content.", resultExchange.getIn().getBody(String.class));
+        assertEquals("mycamelbucket", resultExchange.getIn().getHeader(S3Constants.BUCKET_NAME));
+        assertEquals("CamelUnitTest", resultExchange.getIn().getHeader(S3Constants.KEY));
+        assertNull(resultExchange.getIn().getHeader(S3Constants.VERSION_ID)); // not enabled on this bucket
+        assertNull(resultExchange.getIn().getHeader(S3Constants.LAST_MODIFIED));
+        assertNull(resultExchange.getIn().getHeader(S3Constants.E_TAG));
+        assertNull(resultExchange.getIn().getHeader(S3Constants.CONTENT_TYPE));
+        assertNull(resultExchange.getIn().getHeader(S3Constants.CONTENT_ENCODING));
+        assertEquals(0L, resultExchange.getIn().getHeader(S3Constants.CONTENT_LENGTH));
+        assertNull(resultExchange.getIn().getHeader(S3Constants.CONTENT_DISPOSITION));
+        assertNull(resultExchange.getIn().getHeader(S3Constants.CONTENT_MD5));
+        assertNull(resultExchange.getIn().getHeader(S3Constants.CACHE_CONTROL));
+    }
+    
+    private void assertResponseMessage(Message message) {
+        assertEquals("3a5c8b1ad448bca04584ecb55b836264", message.getHeader(S3Constants.E_TAG));
+        assertNull(message.getHeader(S3Constants.VERSION_ID));
+    }
+    
+    @Override
+    protected JndiRegistry createRegistry() throws Exception {
+        JndiRegistry registry = super.createRegistry();
+        registry.bind("amazonS3Client", new AmazonS3ClientMock());
+        
+        return registry;
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start")
+                    .to("aws-s3://mycamelbucket?amazonS3Client=#amazonS3Client&region=us-west-1");
+                
+                from("aws-s3://mycamelbucket?amazonS3Client=#amazonS3Client&region=us-west-1&maxMessagesPerPoll=5")
+                    .to("mock:result");
+            }
+        };
+    }
+}
\ No newline at end of file

Added: camel/trunk/components/camel-aws/src/test/java/org/apache/camel/component/aws/s3/integration/S3ComponentIntegrationTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-aws/src/test/java/org/apache/camel/component/aws/s3/integration/S3ComponentIntegrationTest.java?rev=1087367&view=auto
==============================================================================
--- camel/trunk/components/camel-aws/src/test/java/org/apache/camel/component/aws/s3/integration/S3ComponentIntegrationTest.java (added)
+++ camel/trunk/components/camel-aws/src/test/java/org/apache/camel/component/aws/s3/integration/S3ComponentIntegrationTest.java Thu Mar 31 16:46:38 2011
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws.s3.integration;
+
+import java.io.InputStream;
+
+import org.apache.camel.EndpointInject;
+import org.apache.camel.Exchange;
+import org.apache.camel.ExchangePattern;
+import org.apache.camel.Message;
+import org.apache.camel.Processor;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.aws.s3.S3Constants;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Ignore;
+import org.junit.Test;
+
+@Ignore("Must be manually tested. Provide your own accessKey and secretKey!")
+public class S3ComponentIntegrationTest extends CamelTestSupport {
+    
+    @EndpointInject(uri = "direct:start")
+    private ProducerTemplate template;
+    
+    @EndpointInject(uri = "mock:result")
+    private MockEndpoint result;
+    
+    @Test
+    public void sendInOnly() throws Exception {
+        result.expectedMessageCount(1);
+        
+        Exchange exchange = template.send("direct:start", ExchangePattern.InOnly, new Processor() {
+            public void process(Exchange exchange) throws Exception {
+                exchange.getIn().setHeader(S3Constants.KEY, "CamelUnitTest");
+                exchange.getIn().setBody("This is my bucket content.");
+            }
+        });
+        
+        assertMockEndpointsSatisfied();
+        
+        assertResultExchange(result.getExchanges().get(0));
+        
+        assertResponseMessage(exchange.getIn());
+    }
+
+    @Test
+    public void sendInOut() throws Exception {
+        result.expectedMessageCount(1);
+        
+        Exchange exchange = template.send("direct:start", ExchangePattern.InOut, new Processor() {
+            public void process(Exchange exchange) throws Exception {
+                exchange.getIn().setHeader(S3Constants.KEY, "CamelUnitTest");
+                exchange.getIn().setBody("This is my bucket content.");
+            }
+        });
+        
+        assertMockEndpointsSatisfied();
+        
+        assertResultExchange(result.getExchanges().get(0));
+        
+        assertResponseMessage(exchange.getOut());
+    }
+    
+    private void assertResultExchange(Exchange resultExchange) {
+        assertIsInstanceOf(InputStream.class, resultExchange.getIn().getBody());
+        assertEquals("This is my bucket content.", resultExchange.getIn().getBody(String.class));
+        assertEquals("mycamelbucket", resultExchange.getIn().getHeader(S3Constants.BUCKET_NAME));
+        assertEquals("CamelUnitTest", resultExchange.getIn().getHeader(S3Constants.KEY));
+        assertNull(resultExchange.getIn().getHeader(S3Constants.VERSION_ID)); // not enabled on this bucket
+        assertNotNull(resultExchange.getIn().getHeader(S3Constants.LAST_MODIFIED));
+        assertEquals("3a5c8b1ad448bca04584ecb55b836264", resultExchange.getIn().getHeader(S3Constants.E_TAG));
+        assertEquals("application/octet-stream", resultExchange.getIn().getHeader(S3Constants.CONTENT_TYPE));
+        assertNull(resultExchange.getIn().getHeader(S3Constants.CONTENT_ENCODING));
+        assertEquals(26L, resultExchange.getIn().getHeader(S3Constants.CONTENT_LENGTH));
+        assertNull(resultExchange.getIn().getHeader(S3Constants.CONTENT_DISPOSITION));
+        assertNull(resultExchange.getIn().getHeader(S3Constants.CONTENT_MD5));
+        assertNull(resultExchange.getIn().getHeader(S3Constants.CACHE_CONTROL));
+    }
+    
+    private void assertResponseMessage(Message message) {
+        assertEquals("3a5c8b1ad448bca04584ecb55b836264", message.getHeader(S3Constants.E_TAG));
+        assertNull(message.getHeader(S3Constants.VERSION_ID));
+    }
+    
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start")
+                    .to("aws-s3://mycamelbucket?accessKey=xxx&secretKey=yyy&region=us-west-1");
+                
+                from("aws-s3://mycamelbucket?accessKey=xxx&secretKey=yyy&region=us-west-1")
+                    .to("mock:result");
+            }
+        };
+    }
+}
\ No newline at end of file

Added: camel/trunk/components/camel-aws/src/test/resources/org/apache/camel/component/aws/s3/S3ComponentSpringTest-context.xml
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-aws/src/test/resources/org/apache/camel/component/aws/s3/S3ComponentSpringTest-context.xml?rev=1087367&view=auto
==============================================================================
--- camel/trunk/components/camel-aws/src/test/resources/org/apache/camel/component/aws/s3/S3ComponentSpringTest-context.xml (added)
+++ camel/trunk/components/camel-aws/src/test/resources/org/apache/camel/component/aws/s3/S3ComponentSpringTest-context.xml Thu Mar 31 16:46:38 2011
@@ -0,0 +1,36 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements. See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version
+  2.0 (the "License"); you may not use this file except in compliance
+  with the License. You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0 Unless required by
+  applicable law or agreed to in writing, software distributed under
+  the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES
+  OR CONDITIONS OF ANY KIND, either express or implied. See the
+  License for the specific language governing permissions and
+  limitations under the License.
+-->
+<beans xmlns="http://www.springframework.org/schema/beans"
+       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+       xsi:schemaLocation="
+    http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
+    http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd">
+
+    <camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
+        <route>
+            <from uri="direct:start"/>
+            <to uri="aws-s3://mycamelbucket?amazonS3Client=#amazonS3Client"/>
+        </route>
+        
+        <route>
+            <from uri="aws-s3://mycamelbucket?amazonS3Client=#amazonS3Client&amp;maxMessagesPerPoll=5"/>
+            <to uri="mock:result"/>
+        </route>
+    </camelContext>
+
+    <bean id="amazonS3Client" class="org.apache.camel.component.aws.s3.AmazonS3ClientMock"/>
+</beans>
\ No newline at end of file