You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2018/10/16 12:17:52 UTC
[camel] 01/03: CAMEL-12884 - Camel-AWS Lambda: Add support for
event source mapping
This is an automated email from the ASF dual-hosted git repository.
acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git
commit 5348a9600fcd1fb964dc390f60cc258c5505f8a6
Author: Andrea Cosentino <an...@gmail.com>
AuthorDate: Tue Oct 16 13:49:32 2018 +0200
CAMEL-12884 - Camel-AWS Lambda: Add support for event source mapping
---
.../component/aws/lambda/LambdaConstants.java | 3 +-
.../component/aws/lambda/LambdaOperations.java | 3 +-
.../camel/component/aws/lambda/LambdaProducer.java | 57 ++++++++++++++++++----
.../aws/lambda/AmazonLambdaClientMock.java | 7 ++-
.../aws/lambda/LambdaComponentSpringTest.java | 17 +++++++
.../component/aws/lambda/LambdaProducerTest.java | 20 ++++++++
.../lambda/LambdaComponentSpringTest-context.xml | 5 +-
7 files changed, 98 insertions(+), 14 deletions(-)
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/lambda/LambdaConstants.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/lambda/LambdaConstants.java
index 95ced62..ce64f5d 100644
--- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/lambda/LambdaConstants.java
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/lambda/LambdaConstants.java
@@ -43,5 +43,6 @@ public interface LambdaConstants {
String SDK_REQUEST_TIMEOUT = "CamelAwsLambdaSdkRequestTimeout";
String SECURITY_GROUP_IDS = "CamelAwsLambdaSecurityGroupIds";
String SUBNET_IDS = "CamelAwsLambdaSubnetIds";
-
+ String EVENT_SOURCE_ARN = "CamelAwsLambdaEventSourceArn";
+ String EVENT_SOURCE_BATCH_SIZE = "CamelAwsLambdaEventSourceBatchSize";
}
\ No newline at end of file
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/lambda/LambdaOperations.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/lambda/LambdaOperations.java
index 1d24bfe..23a9491 100644
--- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/lambda/LambdaOperations.java
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/lambda/LambdaOperations.java
@@ -23,5 +23,6 @@ public enum LambdaOperations {
createFunction,
deleteFunction,
invokeFunction,
- updateFunction
+ updateFunction,
+ createEventSourceMapping
}
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/lambda/LambdaProducer.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/lambda/LambdaProducer.java
index 2fcb27c..d52a7c3 100644
--- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/lambda/LambdaProducer.java
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/lambda/LambdaProducer.java
@@ -16,6 +16,8 @@
*/
package org.apache.camel.component.aws.lambda;
+import static org.apache.camel.component.aws.common.AwsExchangeUtil.getMessageForResponse;
+
import java.io.File;
import java.io.FileInputStream;
import java.nio.ByteBuffer;
@@ -23,8 +25,19 @@ import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+import org.apache.camel.impl.DefaultProducer;
+import org.apache.camel.util.CastUtils;
+import org.apache.camel.util.ObjectHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import com.amazonaws.AmazonServiceException;
import com.amazonaws.services.lambda.AWSLambda;
+import com.amazonaws.services.lambda.model.CreateEventSourceMappingRequest;
+import com.amazonaws.services.lambda.model.CreateEventSourceMappingResult;
import com.amazonaws.services.lambda.model.CreateFunctionRequest;
import com.amazonaws.services.lambda.model.CreateFunctionResult;
import com.amazonaws.services.lambda.model.DeadLetterConfig;
@@ -42,16 +55,6 @@ import com.amazonaws.services.lambda.model.UpdateFunctionCodeRequest;
import com.amazonaws.services.lambda.model.UpdateFunctionCodeResult;
import com.amazonaws.services.lambda.model.VpcConfig;
import com.amazonaws.util.IOUtils;
-import org.apache.camel.Endpoint;
-import org.apache.camel.Exchange;
-import org.apache.camel.Message;
-import org.apache.camel.impl.DefaultProducer;
-import org.apache.camel.util.CastUtils;
-import org.apache.camel.util.ObjectHelper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static org.apache.camel.component.aws.common.AwsExchangeUtil.getMessageForResponse;
/**
* A Producer which sends messages to the Amazon Web Service Lambda <a
@@ -86,6 +89,9 @@ public class LambdaProducer extends DefaultProducer {
case updateFunction:
updateFunction(getEndpoint().getAwsLambdaClient(), exchange);
break;
+ case createEventSourceMapping:
+ createEventSourceMapping(getEndpoint().getAwsLambdaClient(), exchange);
+ break;
default:
throw new IllegalArgumentException("Unsupported operation");
}
@@ -341,6 +347,37 @@ public class LambdaProducer extends DefaultProducer {
message.setBody(result);
}
+ private void createEventSourceMapping(AWSLambda lambdaClient, Exchange exchange) {
+ CreateEventSourceMappingResult result;
+ try {
+ CreateEventSourceMappingRequest request = new CreateEventSourceMappingRequest().withFunctionName(getConfiguration().getFunction());
+ if (ObjectHelper.isNotEmpty(exchange.getIn().getHeader(LambdaConstants.EVENT_SOURCE_ARN))) {
+ request.withEventSourceArn(exchange.getIn().getHeader(LambdaConstants.EVENT_SOURCE_ARN, String.class));
+ } else {
+ throw new IllegalArgumentException("Event Source Arn must be specified");
+ }
+ if (ObjectHelper.isNotEmpty(exchange.getIn().getHeader(LambdaConstants.EVENT_SOURCE_BATCH_SIZE))) {
+ Integer batchSize = exchange.getIn().getHeader(LambdaConstants.EVENT_SOURCE_BATCH_SIZE, Integer.class);
+ request.withBatchSize(batchSize);
+ }
+ if (ObjectHelper.isNotEmpty(exchange.getIn().getHeader(LambdaConstants.SDK_CLIENT_EXECUTION_TIMEOUT))) {
+ Integer timeout = exchange.getIn().getHeader(LambdaConstants.SDK_CLIENT_EXECUTION_TIMEOUT, Integer.class);
+ request.withSdkClientExecutionTimeout(timeout);
+ }
+
+ if (ObjectHelper.isNotEmpty(exchange.getIn().getHeader(LambdaConstants.SDK_REQUEST_TIMEOUT))) {
+ Integer timeout = exchange.getIn().getHeader(LambdaConstants.SDK_REQUEST_TIMEOUT, Integer.class);
+ request.withSdkRequestTimeout(timeout);
+ }
+ result = lambdaClient.createEventSourceMapping(request);
+ } catch (AmazonServiceException ase) {
+ LOG.trace("createEventSourceMapping command returned the error code {}", ase.getErrorCode());
+ throw ase;
+ }
+ Message message = getMessageForResponse(exchange);
+ message.setBody(result);
+ }
+
private LambdaOperations determineOperation(Exchange exchange) {
LambdaOperations operation = exchange.getIn().getHeader(LambdaConstants.OPERATION, LambdaOperations.class);
if (operation == null) {
diff --git a/components/camel-aws/src/test/java/org/apache/camel/component/aws/lambda/AmazonLambdaClientMock.java b/components/camel-aws/src/test/java/org/apache/camel/component/aws/lambda/AmazonLambdaClientMock.java
index 8c4ab2b..8e6ed76 100644
--- a/components/camel-aws/src/test/java/org/apache/camel/component/aws/lambda/AmazonLambdaClientMock.java
+++ b/components/camel-aws/src/test/java/org/apache/camel/component/aws/lambda/AmazonLambdaClientMock.java
@@ -111,7 +111,12 @@ public class AmazonLambdaClientMock extends AbstractAWSLambda {
@Override
public CreateEventSourceMappingResult createEventSourceMapping(CreateEventSourceMappingRequest createEventSourceMappingRequest) {
- throw new UnsupportedOperationException();
+ CreateEventSourceMappingResult result = new CreateEventSourceMappingResult();
+ result.setBatchSize(100);
+ result.setFunctionArn("arn:aws:lambda:eu-central-1:643534317684:function:" + createEventSourceMappingRequest.getFunctionName());
+ result.setState("Enabled");
+ result.setEventSourceArn("arn:aws:sqs:eu-central-1:643534317684:testqueue");
+ return result;
}
@Override
diff --git a/components/camel-aws/src/test/java/org/apache/camel/component/aws/lambda/LambdaComponentSpringTest.java b/components/camel-aws/src/test/java/org/apache/camel/component/aws/lambda/LambdaComponentSpringTest.java
index 400bd02..2c9f9fa 100644
--- a/components/camel-aws/src/test/java/org/apache/camel/component/aws/lambda/LambdaComponentSpringTest.java
+++ b/components/camel-aws/src/test/java/org/apache/camel/component/aws/lambda/LambdaComponentSpringTest.java
@@ -17,6 +17,8 @@
package org.apache.camel.component.aws.lambda;
import java.io.*;
+
+import com.amazonaws.services.lambda.model.CreateEventSourceMappingResult;
import com.amazonaws.services.lambda.model.CreateFunctionResult;
import com.amazonaws.services.lambda.model.DeleteFunctionResult;
import com.amazonaws.services.lambda.model.GetFunctionResult;
@@ -111,6 +113,21 @@ public class LambdaComponentSpringTest extends CamelSpringTestSupport {
assertNotNull(exchange.getOut().getBody(String.class));
assertEquals(exchange.getOut().getBody(String.class), "{\"Hello\":\"Camel\"}");
}
+
+ @Test
+ public void lambdaCreateEventSourceMappingTest() throws Exception {
+ Exchange exchange = template.send("direct:createEventSourceMapping", ExchangePattern.InOut, new Processor() {
+ @Override
+ public void process(Exchange exchange) throws Exception {
+ exchange.getIn().setHeader(LambdaConstants.EVENT_SOURCE_ARN, "arn:aws:sqs:eu-central-1:643534317684:testqueue");
+ exchange.getIn().setHeader(LambdaConstants.EVENT_SOURCE_BATCH_SIZE, 100);
+ }
+ });
+ assertMockEndpointsSatisfied();
+
+ CreateEventSourceMappingResult result = exchange.getOut().getBody(CreateEventSourceMappingResult.class);
+ assertEquals(result.getFunctionArn(), "arn:aws:lambda:eu-central-1:643534317684:function:GetHelloWithName");
+ }
@Override
diff --git a/components/camel-aws/src/test/java/org/apache/camel/component/aws/lambda/LambdaProducerTest.java b/components/camel-aws/src/test/java/org/apache/camel/component/aws/lambda/LambdaProducerTest.java
index 006358e..590ad59 100644
--- a/components/camel-aws/src/test/java/org/apache/camel/component/aws/lambda/LambdaProducerTest.java
+++ b/components/camel-aws/src/test/java/org/apache/camel/component/aws/lambda/LambdaProducerTest.java
@@ -17,6 +17,8 @@
package org.apache.camel.component.aws.lambda;
import java.io.*;
+
+import com.amazonaws.services.lambda.model.CreateEventSourceMappingResult;
import com.amazonaws.services.lambda.model.CreateFunctionResult;
import com.amazonaws.services.lambda.model.DeleteFunctionResult;
import com.amazonaws.services.lambda.model.GetFunctionResult;
@@ -152,6 +154,21 @@ public class LambdaProducerTest extends CamelTestSupport {
assertNotNull(exchange.getOut().getBody(String.class));
assertEquals(exchange.getOut().getBody(String.class), "{\"Hello\":\"Camel\"}");
}
+
+ @Test
+ public void lambdaCreateEventSourceMappingTest() throws Exception {
+ Exchange exchange = template.send("direct:createEventSourceMapping", ExchangePattern.InOut, new Processor() {
+ @Override
+ public void process(Exchange exchange) throws Exception {
+ exchange.getIn().setHeader(LambdaConstants.EVENT_SOURCE_ARN, "arn:aws:sqs:eu-central-1:643534317684:testqueue");
+ exchange.getIn().setHeader(LambdaConstants.EVENT_SOURCE_BATCH_SIZE, 100);
+ }
+ });
+ assertMockEndpointsSatisfied();
+
+ CreateEventSourceMappingResult result = exchange.getOut().getBody(CreateEventSourceMappingResult.class);
+ assertEquals(result.getFunctionArn(), "arn:aws:lambda:eu-central-1:643534317684:function:GetHelloWithName");
+ }
@Override
@@ -194,6 +211,9 @@ public class LambdaProducerTest extends CamelTestSupport {
.to("aws-lambda://GetHelloWithName?awsLambdaClient=#awsLambdaClient&operation=updateFunction")
.to("mock:result");
+ from("direct:createEventSourceMapping")
+ .to("aws-lambda://GetHelloWithName?awsLambdaClient=#awsLambdaClient&operation=createEventSourceMapping")
+ .to("mock:result");
}
};
}
diff --git a/components/camel-aws/src/test/resources/org/apache/camel/component/aws/lambda/LambdaComponentSpringTest-context.xml b/components/camel-aws/src/test/resources/org/apache/camel/component/aws/lambda/LambdaComponentSpringTest-context.xml
index 413404a..c3211dd 100644
--- a/components/camel-aws/src/test/resources/org/apache/camel/component/aws/lambda/LambdaComponentSpringTest-context.xml
+++ b/components/camel-aws/src/test/resources/org/apache/camel/component/aws/lambda/LambdaComponentSpringTest-context.xml
@@ -50,7 +50,10 @@
<to uri="aws-lambda://GetHelloWithName?awsLambdaClient=#awsLambdaClient&operation=deleteFunction"/>
</route>
-
+ <route>
+ <from uri="direct:createEventSourceMapping"/>
+ <to uri="aws-lambda://GetHelloWithName?awsLambdaClient=#awsLambdaClient&operation=createEventSourceMapping"/>
+ </route>
</camelContext>
<bean id="awsLambdaClient" class="org.apache.camel.component.aws.lambda.AmazonLambdaClientMock"/>