You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2018/10/18 16:02:50 UTC

[camel] 26/45: CAMEL-12884 - Camel-AWS Lambda: Add support for event source mapping

This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch sandbox/camel-3.x
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 0d172dd89039b80c4cbf27be4538d35c40fa178c
Author: Andrea Cosentino <an...@gmail.com>
AuthorDate: Tue Oct 16 13:49:32 2018 +0200

    CAMEL-12884 - Camel-AWS Lambda: Add support for event source mapping
    
    Conflicts:
    	components/camel-aws/src/main/java/org/apache/camel/component/aws/lambda/LambdaProducer.java
---
 .../component/aws/lambda/LambdaConstants.java      |  3 +-
 .../component/aws/lambda/LambdaOperations.java     |  3 +-
 .../camel/component/aws/lambda/LambdaProducer.java | 55 ++++++++++++++++++----
 .../aws/lambda/AmazonLambdaClientMock.java         |  7 ++-
 .../aws/lambda/LambdaComponentSpringTest.java      | 17 +++++++
 .../component/aws/lambda/LambdaProducerTest.java   | 20 ++++++++
 .../lambda/LambdaComponentSpringTest-context.xml   |  5 +-
 7 files changed, 98 insertions(+), 12 deletions(-)

diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/lambda/LambdaConstants.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/lambda/LambdaConstants.java
index 95ced62..ce64f5d 100644
--- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/lambda/LambdaConstants.java
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/lambda/LambdaConstants.java
@@ -43,5 +43,6 @@ public interface LambdaConstants {
     String SDK_REQUEST_TIMEOUT = "CamelAwsLambdaSdkRequestTimeout";
     String SECURITY_GROUP_IDS = "CamelAwsLambdaSecurityGroupIds";
     String SUBNET_IDS = "CamelAwsLambdaSubnetIds";
-
+    String EVENT_SOURCE_ARN = "CamelAwsLambdaEventSourceArn";
+    String EVENT_SOURCE_BATCH_SIZE = "CamelAwsLambdaEventSourceBatchSize";
 }
\ No newline at end of file
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/lambda/LambdaOperations.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/lambda/LambdaOperations.java
index 1d24bfe..23a9491 100644
--- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/lambda/LambdaOperations.java
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/lambda/LambdaOperations.java
@@ -23,5 +23,6 @@ public enum LambdaOperations {
     createFunction,
     deleteFunction,
     invokeFunction,
-    updateFunction
+    updateFunction,
+    createEventSourceMapping
 }
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/lambda/LambdaProducer.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/lambda/LambdaProducer.java
index d513561..265e3f0 100644
--- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/lambda/LambdaProducer.java
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/lambda/LambdaProducer.java
@@ -16,6 +16,8 @@
  */
 package org.apache.camel.component.aws.lambda;
 
+import static org.apache.camel.component.aws.common.AwsExchangeUtil.getMessageForResponse;
+
 import java.io.File;
 import java.io.FileInputStream;
 import java.nio.ByteBuffer;
@@ -23,8 +25,19 @@ import java.nio.charset.StandardCharsets;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+import org.apache.camel.impl.DefaultProducer;
+import org.apache.camel.util.CastUtils;
+import org.apache.camel.util.ObjectHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import com.amazonaws.AmazonServiceException;
 import com.amazonaws.services.lambda.AWSLambda;
+import com.amazonaws.services.lambda.model.CreateEventSourceMappingRequest;
+import com.amazonaws.services.lambda.model.CreateEventSourceMappingResult;
 import com.amazonaws.services.lambda.model.CreateFunctionRequest;
 import com.amazonaws.services.lambda.model.CreateFunctionResult;
 import com.amazonaws.services.lambda.model.DeadLetterConfig;
@@ -42,14 +55,6 @@ import com.amazonaws.services.lambda.model.UpdateFunctionCodeRequest;
 import com.amazonaws.services.lambda.model.UpdateFunctionCodeResult;
 import com.amazonaws.services.lambda.model.VpcConfig;
 import com.amazonaws.util.IOUtils;
-import org.apache.camel.Endpoint;
-import org.apache.camel.Exchange;
-import org.apache.camel.Message;
-import org.apache.camel.support.DefaultProducer;
-import org.apache.camel.util.CastUtils;
-import org.apache.camel.util.ObjectHelper;
-
-import static org.apache.camel.component.aws.common.AwsExchangeUtil.getMessageForResponse;
 
 /**
  * A Producer which sends messages to the Amazon Web Service Lambda <a
@@ -82,6 +87,9 @@ public class LambdaProducer extends DefaultProducer {
         case updateFunction:
             updateFunction(getEndpoint().getAwsLambdaClient(), exchange);
             break;
+        case createEventSourceMapping:
+            createEventSourceMapping(getEndpoint().getAwsLambdaClient(), exchange);
+            break;
         default:
             throw new IllegalArgumentException("Unsupported operation");
         }
@@ -337,6 +345,37 @@ public class LambdaProducer extends DefaultProducer {
         message.setBody(result);
     }
     
+    private void createEventSourceMapping(AWSLambda lambdaClient, Exchange exchange) {
+        CreateEventSourceMappingResult result;
+        try {
+            CreateEventSourceMappingRequest request = new CreateEventSourceMappingRequest().withFunctionName(getConfiguration().getFunction());
+            if (ObjectHelper.isNotEmpty(exchange.getIn().getHeader(LambdaConstants.EVENT_SOURCE_ARN))) {
+                request.withEventSourceArn(exchange.getIn().getHeader(LambdaConstants.EVENT_SOURCE_ARN, String.class));
+            } else {
+                throw new IllegalArgumentException("Event Source Arn must be specified");
+            }
+            if (ObjectHelper.isNotEmpty(exchange.getIn().getHeader(LambdaConstants.EVENT_SOURCE_BATCH_SIZE))) {
+                Integer batchSize = exchange.getIn().getHeader(LambdaConstants.EVENT_SOURCE_BATCH_SIZE, Integer.class);
+                request.withBatchSize(batchSize);
+            }
+            if (ObjectHelper.isNotEmpty(exchange.getIn().getHeader(LambdaConstants.SDK_CLIENT_EXECUTION_TIMEOUT))) {
+                Integer timeout = exchange.getIn().getHeader(LambdaConstants.SDK_CLIENT_EXECUTION_TIMEOUT, Integer.class);
+                request.withSdkClientExecutionTimeout(timeout);
+            }
+
+            if (ObjectHelper.isNotEmpty(exchange.getIn().getHeader(LambdaConstants.SDK_REQUEST_TIMEOUT))) {
+                Integer timeout = exchange.getIn().getHeader(LambdaConstants.SDK_REQUEST_TIMEOUT, Integer.class);
+                request.withSdkRequestTimeout(timeout);
+            }
+            result = lambdaClient.createEventSourceMapping(request);
+        } catch (AmazonServiceException ase) {
+            LOG.trace("createEventSourceMapping command returned the error code {}", ase.getErrorCode());
+            throw ase;
+        }
+        Message message = getMessageForResponse(exchange);
+        message.setBody(result);
+    }
+    
     private LambdaOperations determineOperation(Exchange exchange) {
         LambdaOperations operation = exchange.getIn().getHeader(LambdaConstants.OPERATION, LambdaOperations.class);
         if (operation == null) {
diff --git a/components/camel-aws/src/test/java/org/apache/camel/component/aws/lambda/AmazonLambdaClientMock.java b/components/camel-aws/src/test/java/org/apache/camel/component/aws/lambda/AmazonLambdaClientMock.java
index 8c4ab2b..8e6ed76 100644
--- a/components/camel-aws/src/test/java/org/apache/camel/component/aws/lambda/AmazonLambdaClientMock.java
+++ b/components/camel-aws/src/test/java/org/apache/camel/component/aws/lambda/AmazonLambdaClientMock.java
@@ -111,7 +111,12 @@ public class AmazonLambdaClientMock extends AbstractAWSLambda {
 
     @Override
     public CreateEventSourceMappingResult createEventSourceMapping(CreateEventSourceMappingRequest createEventSourceMappingRequest) {
-        throw new UnsupportedOperationException();
+        CreateEventSourceMappingResult result = new CreateEventSourceMappingResult();
+        result.setBatchSize(100);
+        result.setFunctionArn("arn:aws:lambda:eu-central-1:643534317684:function:" + createEventSourceMappingRequest.getFunctionName());
+        result.setState("Enabled");
+        result.setEventSourceArn("arn:aws:sqs:eu-central-1:643534317684:testqueue");
+        return result;
     }
 
     @Override
diff --git a/components/camel-aws/src/test/java/org/apache/camel/component/aws/lambda/LambdaComponentSpringTest.java b/components/camel-aws/src/test/java/org/apache/camel/component/aws/lambda/LambdaComponentSpringTest.java
index 400bd02..2c9f9fa 100644
--- a/components/camel-aws/src/test/java/org/apache/camel/component/aws/lambda/LambdaComponentSpringTest.java
+++ b/components/camel-aws/src/test/java/org/apache/camel/component/aws/lambda/LambdaComponentSpringTest.java
@@ -17,6 +17,8 @@
 package org.apache.camel.component.aws.lambda;
 
 import java.io.*;
+
+import com.amazonaws.services.lambda.model.CreateEventSourceMappingResult;
 import com.amazonaws.services.lambda.model.CreateFunctionResult;
 import com.amazonaws.services.lambda.model.DeleteFunctionResult;
 import com.amazonaws.services.lambda.model.GetFunctionResult;
@@ -111,6 +113,21 @@ public class LambdaComponentSpringTest extends CamelSpringTestSupport {
         assertNotNull(exchange.getOut().getBody(String.class));
         assertEquals(exchange.getOut().getBody(String.class), "{\"Hello\":\"Camel\"}");
     }
+    
+    @Test
+    public void lambdaCreateEventSourceMappingTest() throws Exception {
+        Exchange exchange = template.send("direct:createEventSourceMapping", ExchangePattern.InOut, new Processor() {
+            @Override
+            public void process(Exchange exchange) throws Exception {
+                exchange.getIn().setHeader(LambdaConstants.EVENT_SOURCE_ARN, "arn:aws:sqs:eu-central-1:643534317684:testqueue");
+                exchange.getIn().setHeader(LambdaConstants.EVENT_SOURCE_BATCH_SIZE, 100);
+            }
+        });
+        assertMockEndpointsSatisfied();
+
+        CreateEventSourceMappingResult result = exchange.getOut().getBody(CreateEventSourceMappingResult.class);
+        assertEquals(result.getFunctionArn(), "arn:aws:lambda:eu-central-1:643534317684:function:GetHelloWithName");
+    }
 
 
     @Override
diff --git a/components/camel-aws/src/test/java/org/apache/camel/component/aws/lambda/LambdaProducerTest.java b/components/camel-aws/src/test/java/org/apache/camel/component/aws/lambda/LambdaProducerTest.java
index 006358e..590ad59 100644
--- a/components/camel-aws/src/test/java/org/apache/camel/component/aws/lambda/LambdaProducerTest.java
+++ b/components/camel-aws/src/test/java/org/apache/camel/component/aws/lambda/LambdaProducerTest.java
@@ -17,6 +17,8 @@
 package org.apache.camel.component.aws.lambda;
 
 import java.io.*;
+
+import com.amazonaws.services.lambda.model.CreateEventSourceMappingResult;
 import com.amazonaws.services.lambda.model.CreateFunctionResult;
 import com.amazonaws.services.lambda.model.DeleteFunctionResult;
 import com.amazonaws.services.lambda.model.GetFunctionResult;
@@ -152,6 +154,21 @@ public class LambdaProducerTest extends CamelTestSupport {
         assertNotNull(exchange.getOut().getBody(String.class));
         assertEquals(exchange.getOut().getBody(String.class), "{\"Hello\":\"Camel\"}");
     }
+    
+    @Test
+    public void lambdaCreateEventSourceMappingTest() throws Exception {
+        Exchange exchange = template.send("direct:createEventSourceMapping", ExchangePattern.InOut, new Processor() {
+            @Override
+            public void process(Exchange exchange) throws Exception {
+                exchange.getIn().setHeader(LambdaConstants.EVENT_SOURCE_ARN, "arn:aws:sqs:eu-central-1:643534317684:testqueue");
+                exchange.getIn().setHeader(LambdaConstants.EVENT_SOURCE_BATCH_SIZE, 100);
+            }
+        });
+        assertMockEndpointsSatisfied();
+
+        CreateEventSourceMappingResult result = exchange.getOut().getBody(CreateEventSourceMappingResult.class);
+        assertEquals(result.getFunctionArn(), "arn:aws:lambda:eu-central-1:643534317684:function:GetHelloWithName");
+    }
 
 
     @Override
@@ -194,6 +211,9 @@ public class LambdaProducerTest extends CamelTestSupport {
                     .to("aws-lambda://GetHelloWithName?awsLambdaClient=#awsLambdaClient&operation=updateFunction")
                     .to("mock:result");
 
+                from("direct:createEventSourceMapping")
+                .to("aws-lambda://GetHelloWithName?awsLambdaClient=#awsLambdaClient&operation=createEventSourceMapping")
+                .to("mock:result");
             }
         };
     }
diff --git a/components/camel-aws/src/test/resources/org/apache/camel/component/aws/lambda/LambdaComponentSpringTest-context.xml b/components/camel-aws/src/test/resources/org/apache/camel/component/aws/lambda/LambdaComponentSpringTest-context.xml
index 413404a..c3211dd 100644
--- a/components/camel-aws/src/test/resources/org/apache/camel/component/aws/lambda/LambdaComponentSpringTest-context.xml
+++ b/components/camel-aws/src/test/resources/org/apache/camel/component/aws/lambda/LambdaComponentSpringTest-context.xml
@@ -50,7 +50,10 @@
             <to uri="aws-lambda://GetHelloWithName?awsLambdaClient=#awsLambdaClient&amp;operation=deleteFunction"/>
         </route>
 
-
+        <route>
+            <from uri="direct:createEventSourceMapping"/>
+            <to uri="aws-lambda://GetHelloWithName?awsLambdaClient=#awsLambdaClient&amp;operation=createEventSourceMapping"/>
+        </route>
     </camelContext>
 
     <bean id="awsLambdaClient" class="org.apache.camel.component.aws.lambda.AmazonLambdaClientMock"/>