You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2017/10/31 11:07:06 UTC
[camel] branch master updated: CAMEL-11969 - Camel-AWS: add a
deleteObject operation to the S3 Producer
This is an automated email from the ASF dual-hosted git repository.
acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push:
new 4710347 CAMEL-11969 - Camel-AWS: add a deleteObject operation to the S3 Producer
4710347 is described below
commit 47103472b7ba13e1558b71e232a707e57a24fe8a
Author: Andrea Cosentino <an...@gmail.com>
AuthorDate: Tue Oct 31 12:00:38 2017 +0100
CAMEL-11969 - Camel-AWS: add a deleteObject operation to the S3 Producer
---
.../camel/component/aws/s3/S3Operations.java | 1 +
.../apache/camel/component/aws/s3/S3Producer.java | 90 +++++++++++++---------
.../camel/component/aws/s3/AmazonS3ClientMock.java | 2 +-
.../aws/s3/S3ComponentDeleteObjectSpringTest.java | 74 ++++++++++++++++++
.../aws/s3/S3ComponentDeleteObjectTest.java | 84 ++++++++++++++++++++
.../aws/s3/S3ComponentSpringTest-context.xml | 7 +-
6 files changed, 220 insertions(+), 38 deletions(-)
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Operations.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Operations.java
index 28246bb..ab1b7a0 100644
--- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Operations.java
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Operations.java
@@ -19,6 +19,7 @@ package org.apache.camel.component.aws.s3;
public enum S3Operations {
copyObject,
+ deleteObject,
deleteBucket,
listBuckets
}
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Producer.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Producer.java
index c6f7df6..317073c 100644
--- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Producer.java
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Producer.java
@@ -38,6 +38,7 @@ import com.amazonaws.services.s3.model.CompleteMultipartUploadResult;
import com.amazonaws.services.s3.model.CopyObjectRequest;
import com.amazonaws.services.s3.model.CopyObjectResult;
import com.amazonaws.services.s3.model.DeleteBucketRequest;
+import com.amazonaws.services.s3.model.DeleteObjectRequest;
import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest;
import com.amazonaws.services.s3.model.InitiateMultipartUploadResult;
import com.amazonaws.services.s3.model.ObjectMetadata;
@@ -62,20 +63,19 @@ import org.slf4j.LoggerFactory;
import static org.apache.camel.component.aws.common.AwsExchangeUtil.getMessageForResponse;
/**
- * A Producer which sends messages to the Amazon Web Service Simple Storage Service <a
- * href="http://aws.amazon.com/s3/">AWS S3</a>
+ * A Producer which sends messages to the Amazon Web Service Simple Storage
+ * Service <a href="http://aws.amazon.com/s3/">AWS S3</a>
*/
public class S3Producer extends DefaultProducer {
private static final Logger LOG = LoggerFactory.getLogger(S3Producer.class);
private transient String s3ProducerToString;
-
+
public S3Producer(final Endpoint endpoint) {
super(endpoint);
}
-
@Override
public void process(final Exchange exchange) throws Exception {
S3Operations operation = determineOperation(exchange);
@@ -90,6 +90,9 @@ public class S3Producer extends DefaultProducer {
case copyObject:
copyObject(getEndpoint().getS3Client(), exchange);
break;
+ case deleteObject:
+ deleteObject(getEndpoint().getS3Client(), exchange);
+ break;
case listBuckets:
listBuckets(getEndpoint().getS3Client(), exchange);
break;
@@ -110,7 +113,7 @@ public class S3Producer extends DefaultProducer {
obj = ((WrappedFile<?>)obj).getFile();
}
if (obj instanceof File) {
- filePayload = (File) obj;
+ filePayload = (File)obj;
} else {
throw new InvalidArgumentException("aws-s3: MultiPart upload requires a File input.");
}
@@ -121,8 +124,7 @@ public class S3Producer extends DefaultProducer {
}
final String keyName = determineKey(exchange);
- final InitiateMultipartUploadRequest initRequest = new InitiateMultipartUploadRequest(getConfiguration().getBucketName(),
- keyName, objectMetadata);
+ final InitiateMultipartUploadRequest initRequest = new InitiateMultipartUploadRequest(getConfiguration().getBucketName(), keyName, objectMetadata);
String storageClass = determineStorageClass(exchange);
if (storageClass != null) {
@@ -137,7 +139,8 @@ public class S3Producer extends DefaultProducer {
AccessControlList acl = exchange.getIn().getHeader(S3Constants.ACL, AccessControlList.class);
if (acl != null) {
- // note: if cannedacl and acl are both specified the last one will be used. refer to
+ // note: if cannedacl and acl are both specified the last one will
+ // be used. refer to
// PutObjectRequest#setAccessControlList for more details
initRequest.setAccessControlList(acl);
}
@@ -152,34 +155,24 @@ public class S3Producer extends DefaultProducer {
long filePosition = 0;
-
try {
for (int part = 1; filePosition < contentLength; part++) {
partSize = Math.min(partSize, contentLength - filePosition);
- UploadPartRequest uploadRequest = new UploadPartRequest()
- .withBucketName(getConfiguration().getBucketName()).withKey(keyName)
- .withUploadId(initResponse.getUploadId()).withPartNumber(part)
- .withFileOffset(filePosition)
- .withFile(filePayload)
- .withPartSize(partSize);
+ UploadPartRequest uploadRequest = new UploadPartRequest().withBucketName(getConfiguration().getBucketName()).withKey(keyName)
+ .withUploadId(initResponse.getUploadId()).withPartNumber(part).withFileOffset(filePosition).withFile(filePayload).withPartSize(partSize);
LOG.trace("Uploading part [{}] for {}", part, keyName);
partETags.add(getEndpoint().getS3Client().uploadPart(uploadRequest).getPartETag());
filePosition += partSize;
}
- CompleteMultipartUploadRequest compRequest = new
- CompleteMultipartUploadRequest(getConfiguration().getBucketName(),
- keyName,
- initResponse.getUploadId(),
- partETags);
+ CompleteMultipartUploadRequest compRequest = new CompleteMultipartUploadRequest(getConfiguration().getBucketName(), keyName, initResponse.getUploadId(), partETags);
uploadResult = getEndpoint().getS3Client().completeMultipartUpload(compRequest);
} catch (Exception e) {
- getEndpoint().getS3Client().abortMultipartUpload(new AbortMultipartUploadRequest(
- getConfiguration().getBucketName(), keyName, initResponse.getUploadId()));
+ getEndpoint().getS3Client().abortMultipartUpload(new AbortMultipartUploadRequest(getConfiguration().getBucketName(), keyName, initResponse.getUploadId()));
throw e;
}
@@ -208,13 +201,13 @@ public class S3Producer extends DefaultProducer {
obj = ((WrappedFile<?>)obj).getFile();
}
if (obj instanceof File) {
- filePayload = (File) obj;
+ filePayload = (File)obj;
is = new FileInputStream(filePayload);
} else {
is = exchange.getIn().getMandatoryBody(InputStream.class);
baos = determineLengthInputStream(is);
objectMetadata.setContentLength(baos.size());
- is = new ByteArrayInputStream(baos.toByteArray());
+ is = new ByteArrayInputStream(baos.toByteArray());
}
putObjectRequest = new PutObjectRequest(getConfiguration().getBucketName(), determineKey(exchange), is, objectMetadata);
@@ -232,7 +225,8 @@ public class S3Producer extends DefaultProducer {
AccessControlList acl = exchange.getIn().getHeader(S3Constants.ACL, AccessControlList.class);
if (acl != null) {
- // note: if cannedacl and acl are both specified the last one will be used. refer to
+ // note: if cannedacl and acl are both specified the last one will
+ // be used. refer to
// PutObjectRequest#setAccessControlList for more details
putObjectRequest.setAccessControlList(acl);
}
@@ -255,14 +249,14 @@ public class S3Producer extends DefaultProducer {
FileUtil.deleteFile(filePayload);
}
}
-
+
private void copyObject(AmazonS3 s3Client, Exchange exchange) {
String bucketNameDestination;
String destinationKey;
String sourceKey;
String bucketName;
String versionId;
-
+
bucketName = exchange.getIn().getHeader(S3Constants.BUCKET_NAME, String.class);
if (ObjectHelper.isEmpty(bucketName)) {
bucketName = getConfiguration().getBucketName();
@@ -271,7 +265,7 @@ public class S3Producer extends DefaultProducer {
destinationKey = exchange.getIn().getHeader(S3Constants.DESTINATION_KEY, String.class);
bucketNameDestination = exchange.getIn().getHeader(S3Constants.BUCKET_DESTINATION_NAME, String.class);
versionId = exchange.getIn().getHeader(S3Constants.VERSION_ID, String.class);
-
+
if (ObjectHelper.isEmpty(bucketName)) {
throw new IllegalArgumentException("Bucket Name must be specified for copyObject Operation");
}
@@ -291,24 +285,48 @@ public class S3Producer extends DefaultProducer {
copyObjectRequest = new CopyObjectRequest(bucketName, sourceKey, versionId, bucketNameDestination, destinationKey);
}
CopyObjectResult copyObjectResult = s3Client.copyObject(copyObjectRequest);
-
+
Message message = getMessageForResponse(exchange);
message.setHeader(S3Constants.E_TAG, copyObjectResult.getETag());
if (copyObjectResult.getVersionId() != null) {
message.setHeader(S3Constants.VERSION_ID, copyObjectResult.getVersionId());
}
}
-
+
+ private void deleteObject(AmazonS3 s3Client, Exchange exchange) {
+ String sourceKey;
+ String bucketName;
+
+ bucketName = exchange.getIn().getHeader(S3Constants.BUCKET_NAME, String.class);
+ if (ObjectHelper.isEmpty(bucketName)) {
+ bucketName = getConfiguration().getBucketName();
+ }
+ sourceKey = exchange.getIn().getHeader(S3Constants.KEY, String.class);
+
+ if (ObjectHelper.isEmpty(bucketName)) {
+ throw new IllegalArgumentException("Bucket Name must be specified for deleteObject Operation");
+ }
+ if (ObjectHelper.isEmpty(sourceKey)) {
+ throw new IllegalArgumentException("Source Key must be specified for deleteObject Operation");
+ }
+ DeleteObjectRequest deleteObjectRequest;
+ deleteObjectRequest = new DeleteObjectRequest(bucketName, sourceKey);
+ s3Client.deleteObject(deleteObjectRequest);
+
+ Message message = getMessageForResponse(exchange);
+ message.setBody(true);
+ }
+
private void listBuckets(AmazonS3 s3Client, Exchange exchange) {
List<Bucket> bucketsList = s3Client.listBuckets();
-
+
Message message = getMessageForResponse(exchange);
message.setBody(bucketsList);
}
-
+
private void deleteBucket(AmazonS3 s3Client, Exchange exchange) {
String bucketName;
-
+
bucketName = exchange.getIn().getHeader(S3Constants.BUCKET_NAME, String.class);
if (ObjectHelper.isEmpty(bucketName)) {
bucketName = getConfiguration().getBucketName();
@@ -317,7 +335,7 @@ public class S3Producer extends DefaultProducer {
DeleteBucketRequest deleteBucketRequest = new DeleteBucketRequest(bucketName);
s3Client.deleteBucket(deleteBucketRequest);
}
-
+
private S3Operations determineOperation(Exchange exchange) {
S3Operations operation = exchange.getIn().getHeader(S3Constants.S3_OPERATION, S3Operations.class);
if (operation == null) {
@@ -400,7 +418,7 @@ public class S3Producer extends DefaultProducer {
return storageClass;
}
-
+
private ByteArrayOutputStream determineLengthInputStream(InputStream is) throws IOException {
ByteArrayOutputStream out = new ByteArrayOutputStream();
byte[] bytes = new byte[1024];
@@ -425,7 +443,7 @@ public class S3Producer extends DefaultProducer {
@Override
public S3Endpoint getEndpoint() {
- return (S3Endpoint) super.getEndpoint();
+ return (S3Endpoint)super.getEndpoint();
}
}
diff --git a/components/camel-aws/src/test/java/org/apache/camel/component/aws/s3/AmazonS3ClientMock.java b/components/camel-aws/src/test/java/org/apache/camel/component/aws/s3/AmazonS3ClientMock.java
index 7dce3b8..c848726 100644
--- a/components/camel-aws/src/test/java/org/apache/camel/component/aws/s3/AmazonS3ClientMock.java
+++ b/components/camel-aws/src/test/java/org/apache/camel/component/aws/s3/AmazonS3ClientMock.java
@@ -358,7 +358,7 @@ public class AmazonS3ClientMock extends AmazonS3Client {
@Override
public void deleteObject(DeleteObjectRequest deleteObjectRequest) throws AmazonClientException, AmazonServiceException {
- throw new UnsupportedOperationException();
+ // noop
}
@Override
diff --git a/components/camel-aws/src/test/java/org/apache/camel/component/aws/s3/S3ComponentDeleteObjectSpringTest.java b/components/camel-aws/src/test/java/org/apache/camel/component/aws/s3/S3ComponentDeleteObjectSpringTest.java
new file mode 100644
index 0000000..4fedf81
--- /dev/null
+++ b/components/camel-aws/src/test/java/org/apache/camel/component/aws/s3/S3ComponentDeleteObjectSpringTest.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws.s3;
+
+import org.apache.camel.EndpointInject;
+import org.apache.camel.Exchange;
+import org.apache.camel.ExchangePattern;
+import org.apache.camel.Processor;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.impl.JndiRegistry;
+import org.apache.camel.test.spring.CamelSpringTestSupport;
+import org.junit.Test;
+import org.springframework.context.support.ClassPathXmlApplicationContext;
+
+public class S3ComponentDeleteObjectSpringTest extends CamelSpringTestSupport {
+
+ @EndpointInject(uri = "direct:start")
+ private ProducerTemplate template;
+
+ @EndpointInject(uri = "mock:result")
+ private MockEndpoint result;
+
+ private AmazonS3ClientMock client;
+
+ @Test
+ public void sendIn() throws Exception {
+ result.expectedMessageCount(1);
+
+ template.send("direct:deleteObject", ExchangePattern.InOnly, new Processor() {
+ public void process(Exchange exchange) throws Exception {
+ exchange.getIn().setHeader(S3Constants.KEY, "camelKey");
+ }
+ });
+
+ assertMockEndpointsSatisfied();
+
+ assertResultExchange(result.getExchanges().get(0));
+
+ }
+
+ private void assertResultExchange(Exchange resultExchange) {
+ assertEquals(resultExchange.getIn().getBody(), true);
+ }
+
+ @Override
+ protected JndiRegistry createRegistry() throws Exception {
+ JndiRegistry registry = super.createRegistry();
+
+ client = new AmazonS3ClientMock();
+ registry.bind("amazonS3Client", client);
+
+ return registry;
+ }
+
+ @Override
+ protected ClassPathXmlApplicationContext createApplicationContext() {
+ return new ClassPathXmlApplicationContext("org/apache/camel/component/aws/s3/S3ComponentSpringTest-context.xml");
+ }
+}
\ No newline at end of file
diff --git a/components/camel-aws/src/test/java/org/apache/camel/component/aws/s3/S3ComponentDeleteObjectTest.java b/components/camel-aws/src/test/java/org/apache/camel/component/aws/s3/S3ComponentDeleteObjectTest.java
new file mode 100644
index 0000000..ef622c1
--- /dev/null
+++ b/components/camel-aws/src/test/java/org/apache/camel/component/aws/s3/S3ComponentDeleteObjectTest.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws.s3;
+
+import org.apache.camel.EndpointInject;
+import org.apache.camel.Exchange;
+import org.apache.camel.ExchangePattern;
+import org.apache.camel.Processor;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.impl.JndiRegistry;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+public class S3ComponentDeleteObjectTest extends CamelTestSupport {
+
+ @EndpointInject(uri = "direct:start")
+ private ProducerTemplate template;
+
+ @EndpointInject(uri = "mock:result")
+ private MockEndpoint result;
+
+ private AmazonS3ClientMock client;
+
+ @Test
+ public void sendIn() throws Exception {
+ result.expectedMessageCount(1);
+
+ template.send("direct:start", ExchangePattern.InOnly, new Processor() {
+ public void process(Exchange exchange) throws Exception {
+ exchange.getIn().setHeader(S3Constants.KEY, "camelKey");
+ }
+ });
+
+ assertMockEndpointsSatisfied();
+
+ assertResultExchange(result.getExchanges().get(0));
+
+ }
+
+ private void assertResultExchange(Exchange resultExchange) {
+ assertEquals(resultExchange.getIn().getBody(), true);
+ }
+
+ @Override
+ protected JndiRegistry createRegistry() throws Exception {
+ JndiRegistry registry = super.createRegistry();
+
+ client = new AmazonS3ClientMock();
+ registry.bind("amazonS3Client", client);
+
+ return registry;
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ String awsEndpoint = "aws-s3://mycamelbucket?amazonS3Client=#amazonS3Client®ion=us-west-1&operation=deleteObject";
+
+ from("direct:start")
+ .to(awsEndpoint)
+ .to("mock:result");
+
+ }
+ };
+ }
+}
\ No newline at end of file
diff --git a/components/camel-aws/src/test/resources/org/apache/camel/component/aws/s3/S3ComponentSpringTest-context.xml b/components/camel-aws/src/test/resources/org/apache/camel/component/aws/s3/S3ComponentSpringTest-context.xml
index 9dd8d08..0e7356e 100644
--- a/components/camel-aws/src/test/resources/org/apache/camel/component/aws/s3/S3ComponentSpringTest-context.xml
+++ b/components/camel-aws/src/test/resources/org/apache/camel/component/aws/s3/S3ComponentSpringTest-context.xml
@@ -37,7 +37,12 @@
<from uri="direct:listBuckets"/>
<to uri="aws-s3://mycamelbucket?amazonS3Client=#amazonS3Client&operation=listBuckets"/>
<to uri="mock:result"/>
- </route>
+ </route>
+ <route>
+ <from uri="direct:deleteObject"/>
+ <to uri="aws-s3://mycamelbucket?amazonS3Client=#amazonS3Client&operation=deleteObject"/>
+ <to uri="mock:result"/>
+ </route>
<route>
<from uri="aws-s3://mycamelbucket?amazonS3Client=#amazonS3Client&maxMessagesPerPoll=5"/>
<to uri="mock:result"/>
--
To stop receiving notification emails like this one, please contact
['"commits@camel.apache.org" <co...@camel.apache.org>'].