You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@nifi.apache.org by mans2singh <gi...@git.apache.org> on 2016/02/12 06:16:04 UTC

[GitHub] nifi pull request: Nifi-1509 - Support aws lambda put processor

GitHub user mans2singh opened a pull request:

    https://github.com/apache/nifi/pull/216

    Nifi-1509 - Support aws lambda put processor

    Support AWS Lambda processor (https://issues.apache.org/jira/browse/NIFI-1509)

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/mans2singh/nifi nifi-1509

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/nifi/pull/216.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #216
    
----
commit 180a90d12b9b36c25c35f28c7e7435de03e0ea7b
Author: mans2singh <ma...@yahoo.com>
Date:   2016-02-11T04:45:53Z

    first commit for aws lambda
    
    corrected attributes of flow file
    
    added shutdown + provenence calls
    
    minor formatting and unused imports correction
    
    removed unused property
    
    updated to populate exception attributes in flow file
    
    updated write attributes

commit cd05bedda84fdb6bf63fae2b973c708a54b18a56
Author: mans2singh <ma...@yahoo.com>
Date:   2016-02-12T05:14:28Z

    Nifi-1509 - commented out integration test

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: Nifi-1509 - Support aws lambda put processor

Posted by apiri <gi...@git.apache.org>.
Github user apiri commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/216#discussion_r53075770
  
    --- Diff: nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/lambda/ITPutLambdaTest.java ---
    @@ -0,0 +1,146 @@
    +package org.apache.nifi.processors.aws.lambda;
    +import static org.junit.Assert.assertEquals;
    +import static org.junit.Assert.assertNotNull;
    +import static org.junit.Assert.assertNull;
    +import static org.junit.Assert.assertTrue;
    +
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +import java.util.List;
    +
    +import org.apache.nifi.processors.aws.s3.FetchS3Object;
    +import org.apache.nifi.util.MockFlowFile;
    +import org.apache.nifi.util.TestRunner;
    +import org.apache.nifi.util.TestRunners;
    +import org.junit.After;
    +import org.junit.Before;
    +import org.junit.Ignore;
    +import org.junit.Test;
    +
    +/**
    + * This test contains both unit and integration test (integration tests are ignored by default)
    + */
    +public class ITPutLambdaTest {
    --- End diff --
    
    This class needs to just be called ITPutLambda.  The *Test ending has it getting treated as a unit test as part of maven-surefire.  Failsafe operates on those that are IT*.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: Nifi-1509 - Support aws lambda put processor

Posted by markap14 <gi...@git.apache.org>.
Github user markap14 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/216#discussion_r55425686
  
    --- Diff: nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/lambda/PutLambda.java ---
    @@ -0,0 +1,239 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.aws.lambda;
    +
    +import java.io.ByteArrayOutputStream;
    +import java.nio.ByteBuffer;
    +import java.nio.charset.Charset;
    +import java.util.Arrays;
    +import java.util.Collections;
    +import java.util.List;
    +import java.util.concurrent.TimeUnit;
    +
    +import org.apache.commons.lang3.StringUtils;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +
    +import com.amazonaws.AmazonServiceException;
    +import com.amazonaws.services.lambda.AWSLambdaClient;
    +import com.amazonaws.services.lambda.model.InvalidParameterValueException;
    +import com.amazonaws.services.lambda.model.InvalidRequestContentException;
    +import com.amazonaws.services.lambda.model.InvocationType;
    +import com.amazonaws.services.lambda.model.InvokeRequest;
    +import com.amazonaws.services.lambda.model.InvokeResult;
    +import com.amazonaws.services.lambda.model.LogType;
    +import com.amazonaws.services.lambda.model.RequestTooLargeException;
    +import com.amazonaws.services.lambda.model.ResourceNotFoundException;
    +import com.amazonaws.services.lambda.model.TooManyRequestsException;
    +import com.amazonaws.services.lambda.model.UnsupportedMediaTypeException;
    +import com.amazonaws.util.Base64;
    +
    +@InputRequirement(Requirement.INPUT_REQUIRED)
    +@Tags({"amazon", "aws", "lambda", "put"})
    +@CapabilityDescription("Sends the contents to a specified Amazon Lamba Function. "
    +    + "The AWS credentials used for authentication must have permissions execute the Lambda function (lambda:InvokeFunction)."
    +    + "The FlowFile content must be JSON.")
    +@WritesAttributes({
    +    @WritesAttribute(attribute = "aws.lambda.result.function.error", description = "Function error message in result on posting message to AWS Lambda"),
    +    @WritesAttribute(attribute = "aws.lambda.result.status.code", description = "Status code in the result for the message when posting to AWS Lambda"),
    +    @WritesAttribute(attribute = "aws.lambda.result.payload", description = "Payload in the result from AWS Lambda"),
    +    @WritesAttribute(attribute = "aws.lambda.result.log", description = "Log in the result of the message posted to Lambda"),
    +    @WritesAttribute(attribute = "aws.lambda.exception.message", description = "Exception message on invoking from AWS Lambda"),
    +    @WritesAttribute(attribute = "aws.lambda.exception.cause", description = "Exception cause on invoking from AWS Lambda"),
    +    @WritesAttribute(attribute = "aws.lambda.exception.error.code", description = "Exception error code on invoking from AWS Lambda"),
    +    @WritesAttribute(attribute = "aws.lambda.exception.request.id", description = "Exception request id on invoking from AWS Lambda"),
    +    @WritesAttribute(attribute = "aws.lambda.exception.status.code", description = "Exception status code on invoking from AWS Lambda"),
    +    @WritesAttribute(attribute = "aws.lambda.exception.error.type", description = "Exception error type on invoking from AWS Lambda")
    +    })
    +public class PutLambda extends AbstractAWSLambdaProcessor {
    +
    +    /**
    +     * Lambda result function error message
    +     */
    +    public static final String AWS_LAMBDA_RESULT_FUNCTION_ERROR = "aws.lambda.result.function.error";
    +
    +    /**
    +     * Lambda response status code
    +     */
    +    public static final String AWS_LAMBDA_RESULT_STATUS_CODE = "aws.lambda.result.status.code";
    +
    +    /**
    +     * Lambda response log tail (4kb)
    +     */
    +    public static final String AWS_LAMBDA_RESULT_LOG = "aws.lambda.result.log";
    +
    +    /**
    +     * Lambda payload in response
    +     */
    +    public static final String AWS_LAMBDA_RESULT_PAYLOAD = "aws.lambda.result.payload";
    +
    +    /**
    +     * Lambda exception field
    +     */
    +    public static final String AWS_LAMBDA_EXCEPTION_MESSAGE = "aws.lambda.exception.message";
    +
    +    /**
    +     * Lambda exception field
    +     */
    +    public static final String AWS_LAMBDA_EXCEPTION_CAUSE = "aws.lambda.exception.cause";
    +
    +    /**
    +     * Lambda exception field
    +     */
    +    public static final String AWS_LAMBDA_EXCEPTION_ERROR_CODE = "aws.lambda.exception.error.code";
    +
    +    /**
    +     * Lambda exception field
    +     */
    +    public static final String AWS_LAMBDA_EXCEPTION_REQUEST_ID = "aws.lambda.exception.request.id";
    +
    +    /**
    +     * Lambda exception field
    +     */
    +    public static final String AWS_LAMBDA_EXCEPTION_STATUS_CODE = "aws.lambda.exception.status.code";
    +
    +    /**
    +     * Lambda exception field
    +     */
    +    public static final String AWS_LAMBDA_EXCEPTION_ERROR_TYPE = "aws.lambda.exception.error.type";
    +
    +    /**
    +     * Max request body size
    +     */
    +    public static final long MAX_REQUEST_SIZE = 6 * 1000 * 1000;
    +
    +    public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
    +            Arrays.asList(AWS_LAMBDA_FUNCTION_NAME, AWS_LAMBDA_FUNCTION_QUALIFIER, REGION, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, TIMEOUT
    +            ));
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return properties;
    +    }
    +
    +    @Override
    +    public void onTrigger(final ProcessContext context, final ProcessSession session) {
    +
    +        FlowFile flowFile = session.get();
    +        if (flowFile == null) {
    +            return;
    +        }
    +
    +        final String functionName = context.getProperty(AWS_LAMBDA_FUNCTION_NAME).getValue();
    +
    +        final String qualifier = context.getProperty(AWS_LAMBDA_FUNCTION_QUALIFIER).getValue();
    +
    +        // Max size of message is 6 MB
    +        if ( flowFile.getSize() > MAX_REQUEST_SIZE) {
    +            getLogger().error("Max size for request body is 6mb but was {} for flow file {} for function {}",
    +                new Object[]{flowFile.getSize(), flowFile, functionName});
    +            session.transfer(flowFile, REL_FAILURE);
    +            return;
    +        }
    +
    +        final AWSLambdaClient client = getClient();
    +
    +        try {
    +            final ByteArrayOutputStream baos = new ByteArrayOutputStream();
    +            session.exportTo(flowFile, baos);
    +
    +            InvokeRequest invokeRequest = new InvokeRequest()
    +                .withFunctionName(functionName)
    +                .withLogType(LogType.Tail).withInvocationType(InvocationType.RequestResponse)
    +                .withPayload(ByteBuffer.wrap(baos.toByteArray()))
    +                .withQualifier(qualifier);
    +            long startTime = System.nanoTime();
    +
    +            InvokeResult result = client.invoke(invokeRequest);
    +
    +            flowFile = session.putAttribute(flowFile, AWS_LAMBDA_RESULT_STATUS_CODE, result.getStatusCode().toString());
    +
    +            if ( !StringUtils.isBlank(result.getLogResult() )) {
    +                flowFile = session.putAttribute(flowFile, AWS_LAMBDA_RESULT_LOG, new String(Base64.decode(result.getLogResult()),Charset.defaultCharset()));
    +            }
    +
    +            if ( result.getPayload() != null ) {
    +                flowFile = session.putAttribute(flowFile, AWS_LAMBDA_RESULT_PAYLOAD, new String(result.getPayload().array(),Charset.defaultCharset()));
    +            }
    +
    +            if ( ! StringUtils.isBlank(result.getFunctionError()) ){
    +                flowFile = session.putAttribute(flowFile, AWS_LAMBDA_RESULT_FUNCTION_ERROR, result.getFunctionError());
    +                session.transfer(flowFile, REL_FAILURE);
    +            } else {
    +                session.transfer(flowFile, REL_SUCCESS);
    +                final long totalTimeMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime);
    +                session.getProvenanceReporter().send(flowFile, functionName, totalTimeMillis);
    +            }
    +        } catch (final InvalidRequestContentException
    +            | InvalidParameterValueException
    +            | RequestTooLargeException
    +            | ResourceNotFoundException
    +            | UnsupportedMediaTypeException unrecoverableException) {
    +                getLogger().error("Failed to invoke lambda {} with unrecoverable exception {} for flow file {}",
    +                    new Object[]{functionName, unrecoverableException, flowFile});
    +                flowFile = populateExceptionAttributes(session, flowFile, unrecoverableException);
    +                session.transfer(flowFile, REL_FAILURE);
    +        } catch (final TooManyRequestsException retryableServiceException) {
    +            getLogger().error("Failed to invoke lambda {} with exception {} for flow file {}, therefore penalizing flowfile",
    +                new Object[]{functionName, retryableServiceException, flowFile});
    +            flowFile = populateExceptionAttributes(session, flowFile, retryableServiceException);
    +            flowFile = session.penalize(flowFile);
    +            session.transfer(flowFile, REL_FAILURE);
    +            context.yield();
    +        } catch (final AmazonServiceException unrecoverableServiceException) {
    +            getLogger().error("Failed to invoke lambda {} with exception {} for flow file {} sending to fail",
    +                new Object[]{functionName, unrecoverableServiceException, flowFile});
    +            flowFile = populateExceptionAttributes(session, flowFile, unrecoverableServiceException);
    +            session.transfer(flowFile, REL_FAILURE);
    +            context.yield();
    +        } catch (final Exception exception) {
    +            getLogger().error("Failed to invoke lambda {} with exception {} for flow file {}",
    +                new Object[]{functionName, exception, flowFile});
    +            session.transfer(flowFile, REL_FAILURE);
    +            context.yield();
    +        }
    +    }
    +
    +    /**
    +     * Populate exception attributes in the flow file
    +     * @param session process session
    +     * @param flowFile the flow file
    +     * @param exception exception thrown during invocation
    +     * @return FlowFile the updated flow file
    +     */
    +    private FlowFile populateExceptionAttributes(final ProcessSession session, FlowFile flowFile,
    +            final AmazonServiceException exception) {
    +        flowFile = session.putAttribute(flowFile, AWS_LAMBDA_EXCEPTION_MESSAGE, exception.getErrorMessage());
    --- End diff --
    
    I would recommend that we create a HashMap here to put these attributes into and then call session.putAttributes() instead of many calls to session.putAttribute(), as it is much more efficient. Session.putAttribute() has to create a new FlowFile for each invocation, and this is more expensive than just creating a HashMap once.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: Nifi-1509 - Support aws lambda put processor

Posted by mans2singh <gi...@git.apache.org>.
Github user mans2singh commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/216#discussion_r53082519
  
    --- Diff: nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/lambda/ITPutLambdaTest.java ---
    @@ -0,0 +1,146 @@
    +package org.apache.nifi.processors.aws.lambda;
    +import static org.junit.Assert.assertEquals;
    +import static org.junit.Assert.assertNotNull;
    +import static org.junit.Assert.assertNull;
    +import static org.junit.Assert.assertTrue;
    +
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +import java.util.List;
    +
    +import org.apache.nifi.processors.aws.s3.FetchS3Object;
    +import org.apache.nifi.util.MockFlowFile;
    +import org.apache.nifi.util.TestRunner;
    +import org.apache.nifi.util.TestRunners;
    +import org.junit.After;
    +import org.junit.Before;
    +import org.junit.Ignore;
    +import org.junit.Test;
    +
    +/**
    + * This test contains both unit and integration test (integration tests are ignored by default)
    + */
    +public class ITPutLambdaTest {
    --- End diff --
    
    Corrected test class name


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: Nifi-1509 - Support aws lambda put processor

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/nifi/pull/216


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: Nifi-1509 - Support aws lambda put processor

Posted by mans2singh <gi...@git.apache.org>.
Github user mans2singh commented on the pull request:

    https://github.com/apache/nifi/pull/216#issuecomment-190237652
  
    @aldrin @jvwing - Please let me know if there is anything else required for this processor.  Thanks


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: Nifi-1509 - Support aws lambda put processor

Posted by mans2singh <gi...@git.apache.org>.
Github user mans2singh commented on the pull request:

    https://github.com/apache/nifi/pull/216#issuecomment-194116446
  
    @markap14 - I've updated the code based on your comments. Please let me know if you have any other recommendation. Thanks


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---