You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@nifi.apache.org by mans2singh <gi...@git.apache.org> on 2016/02/21 17:29:10 UTC

[GitHub] nifi pull request: Nifi 1540 - AWS Kinesis Get and Put Processors

GitHub user mans2singh opened a pull request:

    https://github.com/apache/nifi/pull/239

    Nifi 1540 - AWS Kinesis Get and Put Processors

    Hi:  I have implemented kinesis consumer and producer.  I've created a new base class for aws processors which do not depend on aws client directly.  Please let me know if you have any comments/recommendations.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/mans2singh/nifi Nifi-1540

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/nifi/pull/239.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #239
    
----
commit d4e3dd7b8e6969c1801fbfe9e4485a4fed3d334b
Author: mans2singh <ma...@yahoo.com>
Date:   2016-02-12T03:39:08Z

    created base class for aws processors

commit 2376bdeb95e1ff83fc81f6e7b76c7d6945d936d2
Author: mans2singh <ma...@yahoo.com>
Date:   2016-02-12T03:39:32Z

    updated pom file for client libs

commit 10f9d84f277096167057bc696e5224999a0e5a18
Author: mans2singh <ma...@yahoo.com>
Date:   2016-02-14T16:47:22Z

    kinesis consumer test

commit 27db6adc466b93325e8412c2d756df9c78e13b6b
Author: mans2singh <ma...@yahoo.com>
Date:   2016-02-17T04:03:35Z

    renamed test class

commit 07cfce2c7dfe13c6db005a9369c273a86d5fcef5
Author: mans2singh <ma...@yahoo.com>
Date:   2016-02-17T15:35:13Z

    support for consumer and refactoring
    
    updated with record handling always allowed
    
    updated consumer properties and added comments
    
    added exception handling
    
    corrected description
    
    refactoring and tests
    
    removed record processor factory from member varible
    
    integration test
    
    added integration tests
    
    added integration and unit tests

commit bf3bda16a03166329f5a235b8a6d31c15d23f8d5
Author: mans2singh <ma...@yahoo.com>
Date:   2016-02-21T04:29:42Z

    updated comment

commit 14cd73f9ff7d3ab1cce5671de0af29f4d5184692
Author: mans2singh <ma...@yahoo.com>
Date:   2016-02-21T04:35:55Z

    comments cleanup and ignoreintegration tests

commit d2cc2a03f0744df1951e9914e5d0d33d403fe9bf
Author: mans2singh <ma...@yahoo.com>
Date:   2016-02-21T07:13:57Z

    synched with mastr and resolved conflicts

commit 34278599d48b141c1059fb3ef7a0b6abc7c6caef
Author: mans2singh <ma...@yahoo.com>
Date:   2016-02-21T07:20:21Z

    uncommented integration tests

commit 4f5eb90f446324e1aec1b1d737c5371ce6006f81
Author: mans2singh <ma...@yahoo.com>
Date:   2016-02-21T07:22:41Z

    removed unused imports

commit 4020abdf277df1d5e252e7cbc13d947852050532
Author: mans2singh <ma...@yahoo.com>
Date:   2016-02-21T15:32:05Z

    commented out integration tests

commit 7d85f655d483bb19f41bb74d9a7650f9d1f4ce04
Author: mans2singh <ma...@yahoo.com>
Date:   2016-02-21T16:21:23Z

    checkstyle errors resolution

commit fcd3eaf6ed27fe272ccb0245f82035154bc290b3
Author: mans2singh <ma...@yahoo.com>
Date:   2016-02-21T16:27:19Z

    minor comment updates

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi issue #239: Nifi 1540 - AWS Kinesis Put Processor

Posted by jvwing <gi...@git.apache.org>.
Github user jvwing commented on the issue:

    https://github.com/apache/nifi/pull/239
  
    The latest changes look good, I've merged to master.
    
    @mans2singh - thank you for the contributions, and for your exceptional perseverance in working this to completion.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi issue #239: Nifi 1540 - AWS Kinesis Put Processor

Posted by jvwing <gi...@git.apache.org>.
Github user jvwing commented on the issue:

    https://github.com/apache/nifi/pull/239
  
    @miquillo, would you please expand on your comment on Kinesis Stream limits?  I believe the code so far respects the batch limits for both records and bytes.  The BATCH_SIZE property is validated for 1-500 records (default 250) and flowfiles are pulled from the inbound queues only up to the maximum number of records and (approximately) the maximum bytes from MAX_MESSAGE_BUFFER_SIZE_MB.
    
    It's true this processor does NOT throttle the total throughput per second, but that can be achieved with an upstream ControlRate processor or by scheduling PutKinesis itself.  I don't think we need to address it beyond capturing the error messages (we do).  The defaults seem sensible for most users.
    
    Do you agree?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: Nifi 1540 - AWS Kinesis Get and Put Processors

Posted by mans2singh <gi...@git.apache.org>.
Github user mans2singh commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/239#discussion_r63047632
  
    --- Diff: nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/AbstractBaseAWSProcessor.java ---
    @@ -0,0 +1,211 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.aws;
    +
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Set;
    +
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.AllowableValue;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.ValidationContext;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.ProcessSessionFactory;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.processors.aws.credentials.provider.service.AWSCredentialsProviderService;
    +import org.apache.nifi.ssl.SSLContextService;
    +
    +import com.amazonaws.Protocol;
    +import com.amazonaws.regions.Region;
    +import com.amazonaws.regions.Regions;
    +
    +/**
    + * This is a base class of Nifi AWS Processors.  This class contains basic property descriptors, AWS credentials and relationships and
    + * is not dependent on AmazonWebServiceClient classes.  It's subclasses add support for interacting with AWS respective
    + * clients.
    + *
    + * @see AbstractAWSCredentialsProviderProcessor
    + * @see AbstractAWSProcessor
    + * @see AmazonWebServiceClient
    + */
    +public abstract class AbstractBaseAWSProcessor extends AbstractSessionFactoryProcessor {
    +
    +    public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success")
    +                .description("FlowFiles are routed to success relationship").build();
    +    public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure")
    +                .description("FlowFiles are routed to failure relationship").build();
    +    public static final Set<Relationship> relationships = Collections.unmodifiableSet(
    +                new HashSet<>(Arrays.asList(REL_SUCCESS, REL_FAILURE)));
    +    /**
    +     * AWS credentials provider service
    +     *
    +     * @see  <a href="http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/AWSCredentialsProvider.html">AWSCredentialsProvider</a>
    +     */
    +    public static final PropertyDescriptor AWS_CREDENTIALS_PROVIDER_SERVICE = new PropertyDescriptor.Builder()
    +            .name("AWS Credentials Provider service")
    +            .description("The Controller Service that is used to obtain aws credentials provider")
    +            .required(false)
    +            .identifiesControllerService(AWSCredentialsProviderService.class)
    +            .build();
    +
    +    public static final PropertyDescriptor CREDENTIALS_FILE = new PropertyDescriptor.Builder()
    +                .name("Credentials File")
    +                .expressionLanguageSupported(false)
    +                .required(false)
    +                .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR)
    +                .build();
    +    public static final PropertyDescriptor ACCESS_KEY = new PropertyDescriptor.Builder()
    +                .name("Access Key")
    +                .expressionLanguageSupported(true)
    +                .required(false)
    +                .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +                .sensitive(true)
    +                .build();
    +    public static final PropertyDescriptor SECRET_KEY = new PropertyDescriptor.Builder()
    +                .name("Secret Key")
    +                .expressionLanguageSupported(true)
    +                .required(false)
    +                .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +                .sensitive(true)
    +                .build();
    +    public static final PropertyDescriptor REGION = new PropertyDescriptor.Builder()
    +                .name("Region")
    +                .required(true)
    +                .allowableValues(getAvailableRegions())
    +                .defaultValue(createAllowableValue(Regions.DEFAULT_REGION).getValue())
    +                .build();
    +    public static final PropertyDescriptor TIMEOUT = new PropertyDescriptor.Builder()
    +                .name("Communications Timeout")
    +                .required(true)
    +                .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
    +                .defaultValue("30 secs")
    +                .build();
    +    public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
    +                .name("SSL Context Service")
    +                .description("Specifies an optional SSL Context Service that, if provided, will be used to create connections")
    +                .required(false)
    +                .identifiesControllerService(SSLContextService.class)
    +                .build();
    +    public static final PropertyDescriptor ENDPOINT_OVERRIDE = new PropertyDescriptor.Builder()
    +                .name("Endpoint Override URL")
    +                .description("Endpoint URL to use instead of the AWS default including scheme, host, port, and path. " +
    +                        "The AWS libraries select an endpoint URL based on the AWS region, but this property overrides " +
    +                        "the selected endpoint URL, allowing use with other S3-compatible endpoints.")
    +                .required(false)
    +                .addValidator(StandardValidators.URL_VALIDATOR)
    +                .build();
    +    public static final PropertyDescriptor PROXY_HOST = new PropertyDescriptor.Builder()
    +            .name("Proxy Host")
    +            .description("Proxy host name or IP")
    +            .expressionLanguageSupported(true)
    +            .required(false)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor PROXY_HOST_PORT = new PropertyDescriptor.Builder()
    +            .name("Proxy Host Port")
    +            .description("Proxy host port")
    +            .expressionLanguageSupported(true)
    +            .required(false)
    +            .addValidator(StandardValidators.PORT_VALIDATOR)
    +            .build();
    +
    +    protected volatile Region region;
    +    protected static final Protocol DEFAULT_PROTOCOL = Protocol.HTTPS;
    +    protected static final String DEFAULT_USER_AGENT = "NiFi";
    +
    +    private static AllowableValue createAllowableValue(final Regions regions) {
    +        return new AllowableValue(regions.getName(), regions.getName(), regions.getName());
    +    }
    +
    +    private static AllowableValue[] getAvailableRegions() {
    +        final List<AllowableValue> values = new ArrayList<>();
    +        for (final Regions regions : Regions.values()) {
    +            values.add(createAllowableValue(regions));
    +        }
    +
    +        return (AllowableValue[]) values.toArray(new AllowableValue[values.size()]);
    +    }
    +
    +    public AbstractBaseAWSProcessor() {
    +        super();
    +    }
    +
    +    @OnScheduled
    +    public void onScheduled(final ProcessContext context) {
    +    }
    --- End diff --
    
    I will remove these. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: Nifi 1540 - AWS Kinesis Get and Put Processors

Posted by mans2singh <gi...@git.apache.org>.
Github user mans2singh commented on the pull request:

    https://github.com/apache/nifi/pull/239#issuecomment-222191326
  
    Hi @jvwing 
    
    Thanks again for your comments:
    
    I have am currently not using REL_FAILURE in GetKinesis as you've mentioned, and wanted your advice on the failure scenarios in GetKinesis:
    
    In the GetKinesis.java (lines 196 - 199):
    
    {code:GetKinesis.java}
     } catch (Exception e) {
                if ( flowFile != null ) {
                    session.remove(flowFile);
                }
    {code:GetKinesis.java}
    
    I am not sure if an exception is throw, should I just remove the flowfile from the session, or should it be transferred to REL_FAILURE or both ? 
    
    Please let me know your recommendations.
    
    Thanks


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi issue #239: Nifi 1540 - AWS Kinesis Put Processor

Posted by jvwing <gi...@git.apache.org>.
Github user jvwing commented on the issue:

    https://github.com/apache/nifi/pull/239
  
    @mans2singh thanks for the displayName, namespace, and test doc changes.
    * **Logging** - I'm mostly comfortable with the logging as-is, I certainly agree with your point about most problems being up-front configuration.  Would it be OK to move the exception message before the listing of the flowfiles (PutKinesis.java:168)?  I think it would be easier to understand problems in bulletins on the processor, while still leaving the flowfile detail.
    * **Naming** - Changing the naming does seem appropriate, to mirror what you have already done with Kinesis Firehose.  Would you please rename
     - PutKinesis to PutKinesisStream (or similar)
     - AbstractKinesisProcessor to AbstractKinesisStreamProcessor (or similar)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: Nifi 1540 - AWS Kinesis Get and Put Processors

Posted by joewitt <gi...@git.apache.org>.
Github user joewitt commented on the pull request:

    https://github.com/apache/nifi/pull/239#issuecomment-211711720
  
    Hello @mans2singh .  Do you think you'll have a chance to incorporate these review feedback items?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #239: Nifi 1540 - AWS Kinesis Put Processor

Posted by jvwing <gi...@git.apache.org>.
Github user jvwing commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/239#discussion_r82722038
  
    --- Diff: nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/AbstractBaseKinesisProcessor.java ---
    @@ -0,0 +1,96 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.aws.kinesis;
    +
    +import java.util.ArrayList;
    +import java.util.List;
    +
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.processors.aws.AbstractAWSCredentialsProviderProcessor;
    +
    +import com.amazonaws.AmazonWebServiceClient;
    +
    +/**
    + * This class provides processor the base class for kinesis client
    + */
    +public abstract class AbstractBaseKinesisProcessor<ClientType extends AmazonWebServiceClient>
    +    extends AbstractAWSCredentialsProviderProcessor<ClientType> {
    +
    +    /**
    +     * Kinesis put record response error message
    +     */
    +    public static final String AWS_KINESIS_ERROR_MESSAGE = "aws.kinesis.error.message";
    +
    +    public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
    +            .name("Batch Size")
    +            .description("Batch size for messages (1-500).")
    +            .defaultValue("250")
    +            .required(false)
    +            .addValidator(StandardValidators.createLongValidator(1, 500, true))
    +            .sensitive(false)
    +            .build();
    +
    +    public static final PropertyDescriptor MAX_MESSAGE_BUFFER_SIZE_MB = new PropertyDescriptor.Builder()
    +            .name("Max message buffer size")
    --- End diff --
    
    Needs `name` and `displayName` separation.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: Nifi 1540 - AWS Kinesis Get and Put Processors

Posted by jvwing <gi...@git.apache.org>.
Github user jvwing commented on the pull request:

    https://github.com/apache/nifi/pull/239#issuecomment-222057686
  
    Your suggestion about the plain old SDK Kinesis API is interesting, but I guess I don't recommend it.  Using the plain SDK would let NiFi handle the multithreading, retries, batching, etc.  It would be a win for the NiFi model.  But then we would also have to pick up at least some of the KCL's other features, like balancing traffic across shards, or leave that as an excercise for the user.  It sounds complicated and hard, and I'm lazy.  Also, the KCL is recommended by Amazon, as you pointed out earlier, and not using it might be a point of confusion as to why we didn't do it the "right" way, especially if a new version of the KCL is released with features we hadn't thought of.  So even though the KCL is an awkward fit in NiFi, as long as it is an opt-in feature, it seems like a good addition to NiFi's AWS interoperability story.
    
    Part of why I would prefer to not change the base AWS processor classes is to keep it opt-in.  I'm not sure how the other AWS Get* processors would benefit from the Kinesis-like threading model, they do not have applications or threads to drive activity outside of NiFi's scheduling.  But I can see lobbying for a change to `AbstractProcessor` to remove `onTrigger`'s `final` modifier for cases like these without divorcing the class hierarchies.
    
    A recently merged [change in the 0.x branch](https://github.com/apache/nifi/commit/de7ecd719a2e9907042628ea3a8283cfe2d4fbac) for NIFI-786 has some shared credential property descriptors and validation logic that you may be able to use, that will hopefully make it easier to implement separate base classes.  Let me know if I can help with that.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi issue #239: Nifi 1540 - AWS Kinesis Put Processor

Posted by joewitt <gi...@git.apache.org>.
Github user joewitt commented on the issue:

    https://github.com/apache/nifi/pull/239
  
    Nice job on this @mans2singh @jvwing @simplesteph @apiri !  Good discussion of various tradeoffs.  Handled the license implications well (I've been double checking and all appear to be ASLv2!).  


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi issue #239: Nifi 1540 - AWS Kinesis Put Processor

Posted by jvwing <gi...@git.apache.org>.
Github user jvwing commented on the issue:

    https://github.com/apache/nifi/pull/239
  
    The JIRA ticket is [NIFI-2892](https://issues.apache.org/jira/browse/NIFI-2892).  I don't see recent progress on it, but would that be something you are interested in working on?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi issue #239: Nifi 1540 - AWS Kinesis Get and Put Processors

Posted by mans2singh <gi...@git.apache.org>.
Github user mans2singh commented on the issue:

    https://github.com/apache/nifi/pull/239
  
    @jvwing - 
    Thanks again for your thorough review.  
    Just for my own learning, if there any pointers in the errors that indicated it was a memory issue, please let me know.  Also, is there any guide that lists the requirements and steps for installing/running running Nifi on AWS ?
    I will work on the PropertyDescriptor name changes. 
    Please let me know if there is anything else I need to do.
    Thanks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi issue #239: Nifi 1540 - AWS Kinesis Get and Put Processors

Posted by jvwing <gi...@git.apache.org>.
Github user jvwing commented on the issue:

    https://github.com/apache/nifi/pull/239
  
    @mans2singh thanks for your recent updates, the property descriptor improvements look good to me.  
    
    As far as property descriptors for data size ("10 KB") and duration ("1 minute") - I believe it is possible to let users specify "1 minute" or "5 minutes" while validating the range in milliseconds and retrieving the value in milliseconds as a long integer.  I also understand the value of transparently passing values through to Kinesis library, and I don't feel strongly about this either way.  Let me know if some sample code would help.
    
    Outstanding issues as I understand them:
    * **AWS Processor Base Class** - I do not believe we need to change the base class hierarchy for all AWS processors as part of this PR.  I recommend merging the code currently in AbstractBaseAWSProcessor into AbstractKinesisProcessor, and reverting changes to AbstractAWSProcessor.  
    * **Notice/License text** - We will need to figure out what text is required in the notice files for the KCL/KPL.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi issue #239: Nifi 1540 - AWS Kinesis Get and Put Processors

Posted by jvwing <gi...@git.apache.org>.
Github user jvwing commented on the issue:

    https://github.com/apache/nifi/pull/239
  
    @mans2singh, I agree the licensing issues prevent it from being bundled with NiFi as-is.  But there might be a few options to move forward and keep the benefits of your work:
    
    * Publish the processors you've already developed as a standalone NAR project
    * Rework the processors to use the Kinesis client in the standard AWS SDK
    
    Any other ideas?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi issue #239: Nifi 1540 - AWS Kinesis Get and Put Processors

Posted by mans2singh <gi...@git.apache.org>.
Github user mans2singh commented on the issue:

    https://github.com/apache/nifi/pull/239
  
    I have shared some [recommended edits to the property labels and doc text](https://github.com/jvwing/nifi/commit/e431a9665ce5c2cbbabf2605cc088108ed74e7c5), please take a look.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: Nifi 1540 - AWS Kinesis Get and Put Processors

Posted by jvwing <gi...@git.apache.org>.
Github user jvwing commented on the pull request:

    https://github.com/apache/nifi/pull/239#issuecomment-222062467
  
    Is `REL_FAILURE` used in GetKinesis?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: Nifi 1540 - AWS Kinesis Get and Put Processors

Posted by jvwing <gi...@git.apache.org>.
Github user jvwing commented on the pull request:

    https://github.com/apache/nifi/pull/239#issuecomment-220657759
  
    @mans2singh, I am a bit behind on this PR, but will try to get up to speed and contribute to the review.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #239: Nifi 1540 - AWS Kinesis Put Processor

Posted by jvwing <gi...@git.apache.org>.
Github user jvwing commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/239#discussion_r82723210
  
    --- Diff: nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/kinesis/ITPutKinesis.java ---
    @@ -0,0 +1,475 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.aws.kinesis;
    +
    +import static org.junit.Assert.assertEquals;
    +import static org.junit.Assert.assertNotNull;
    +
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +
    +import org.apache.nifi.processors.aws.kinesis.stream.PutKinesis;
    +import org.apache.nifi.util.MockFlowFile;
    +import org.apache.nifi.util.TestRunner;
    +import org.apache.nifi.util.TestRunners;
    +import org.junit.After;
    +import org.junit.Before;
    +import org.junit.Test;
    +
    +/**
    + * This test contains both unit and integration test (integration tests are ignored by default)
    + */
    +public class ITPutKinesis {
    +
    +    private TestRunner runner;
    +    protected final static String CREDENTIALS_FILE = System.getProperty("user.home") + "/aws-credentials.properties";
    +
    +    @Before
    +    public void setUp() throws Exception {
    +        runner = TestRunners.newTestRunner(PutKinesis.class);
    +        runner.setProperty(PutKinesis.KINESIS_STREAM_NAME, "kstream");
    --- End diff --
    
    Why not use a class member variable for the happy-path stream name?  It is repeated for each test, and I would have preferred the simplicity of changing it in only one place.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi issue #239: Nifi 1540 - AWS Kinesis Put Processor

Posted by davidkj69 <gi...@git.apache.org>.
Github user davidkj69 commented on the issue:

    https://github.com/apache/nifi/pull/239
  
    Can someone point me to the JIRA ticket for the GetKinesis processor?  I don't see it in the current version of NiFi, i.e. 1.1.2.  Is this processor still on the roadmap for development?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: Nifi 1540 - AWS Kinesis Get and Put Processors

Posted by mans2singh <gi...@git.apache.org>.
Github user mans2singh commented on the pull request:

    https://github.com/apache/nifi/pull/239
  
    Hey Folks: For this pull request - Travis-ci build is passing for me but appveyor is failing in the nifi-framework-core module with the following error.  Please let me know if there is anything I need to do to resolve this issue.  
    
    Thanks
    
    00:58:13] Failed tests: 
    [00:58:13]   TestFileSystemRepository.testRemoveDeletesFileIfNoClaimants:274 null
    [00:58:13] 
    [00:58:13] 
    [00:58:13] Tests in error: 
    [00:58:13] 
    [00:58:13]   TestFileSystemRepository.testExportToFile:348 \ufffd FileSystem C:\projects\nifi\ni...
    [00:58:13]   TestFileSystemRepository.testExportToOutputStream:335 \ufffd FileSystem C:\projects...
    [00:58:13] org.apache.nifi.controller.scheduling.TestProcessorLifecycle.validateEnableOperation(org.apache.nifi.controller.scheduling.TestProcessorLifecycle)
    [00:58:13]   Run 1: TestProcessorLifecycle.validateEnableOperation:97->buildFlowControllerForTest:714 \ufffd Runtime
    [00:58:13]   Run 2: TestProcessorLifecycle.after:91 \ufffd IO Unable to delete file: .\target\test-repo...
    [00:58:13] 
    [00:58:13] org.apache.nifi.controller.scheduling.TestProcessorLifecycle.validateIdempotencyOfProcessorStartOperation(org.apache.nifi.controller.scheduling.TestProcessorLifecycle)
    [00:58:13]   Run 1: TestProcessorLifecycle.validateIdempotencyOfProcessorStartOperation:178 expected:<1> but was:<0>
    [00:58:13]   Run 2: TestProcessorLifecycle.after:91 \ufffd IO Unable to delete file: .\target\test-repo...
    [00:58:13] 
    [00:58:13] 
    [00:58:13] 
    [00:58:13] Tests run: 182, Failures: 1, Errors: 4, Skipped: 2
    



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: Nifi 1540 - AWS Kinesis Get and Put Processors

Posted by olegz <gi...@git.apache.org>.
Github user olegz commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/239#discussion_r55396443
  
    --- Diff: nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/AbstractBaseAWSProcessor.java ---
    @@ -0,0 +1,211 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.aws;
    +
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Set;
    +
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.AllowableValue;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.ValidationContext;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.ProcessSessionFactory;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.processors.aws.credentials.provider.service.AWSCredentialsProviderService;
    +import org.apache.nifi.ssl.SSLContextService;
    +
    +import com.amazonaws.Protocol;
    +import com.amazonaws.regions.Region;
    +import com.amazonaws.regions.Regions;
    +
    +/**
    + * This is a base class of Nifi AWS Processors.  This class contains basic property descriptors, AWS credentials and relationships and
    + * is not dependent on AmazonWebServiceClient classes.  It's subclasses add support for interacting with AWS respective
    + * clients.
    + *
    + * @see AbstractAWSCredentialsProviderProcessor
    + * @see AbstractAWSProcessor
    + * @see AmazonWebServiceClient
    + */
    +public abstract class AbstractBaseAWSProcessor extends AbstractSessionFactoryProcessor {
    +
    +    public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success")
    +                .description("FlowFiles are routed to success relationship").build();
    +    public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure")
    +                .description("FlowFiles are routed to failure relationship").build();
    +    public static final Set<Relationship> relationships = Collections.unmodifiableSet(
    +                new HashSet<>(Arrays.asList(REL_SUCCESS, REL_FAILURE)));
    +    /**
    +     * AWS credentials provider service
    +     *
    +     * @see  <a href="http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/AWSCredentialsProvider.html">AWSCredentialsProvider</a>
    +     */
    +    public static final PropertyDescriptor AWS_CREDENTIALS_PROVIDER_SERVICE = new PropertyDescriptor.Builder()
    +            .name("AWS Credentials Provider service")
    +            .description("The Controller Service that is used to obtain aws credentials provider")
    +            .required(false)
    +            .identifiesControllerService(AWSCredentialsProviderService.class)
    +            .build();
    +
    +    public static final PropertyDescriptor CREDENTIALS_FILE = new PropertyDescriptor.Builder()
    +                .name("Credentials File")
    +                .expressionLanguageSupported(false)
    +                .required(false)
    +                .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR)
    +                .build();
    +    public static final PropertyDescriptor ACCESS_KEY = new PropertyDescriptor.Builder()
    +                .name("Access Key")
    +                .expressionLanguageSupported(true)
    +                .required(false)
    +                .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +                .sensitive(true)
    +                .build();
    +    public static final PropertyDescriptor SECRET_KEY = new PropertyDescriptor.Builder()
    +                .name("Secret Key")
    +                .expressionLanguageSupported(true)
    +                .required(false)
    +                .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +                .sensitive(true)
    +                .build();
    +    public static final PropertyDescriptor REGION = new PropertyDescriptor.Builder()
    +                .name("Region")
    +                .required(true)
    +                .allowableValues(getAvailableRegions())
    +                .defaultValue(createAllowableValue(Regions.DEFAULT_REGION).getValue())
    +                .build();
    +    public static final PropertyDescriptor TIMEOUT = new PropertyDescriptor.Builder()
    +                .name("Communications Timeout")
    +                .required(true)
    +                .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
    +                .defaultValue("30 secs")
    +                .build();
    +    public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
    +                .name("SSL Context Service")
    +                .description("Specifies an optional SSL Context Service that, if provided, will be used to create connections")
    +                .required(false)
    +                .identifiesControllerService(SSLContextService.class)
    +                .build();
    +    public static final PropertyDescriptor ENDPOINT_OVERRIDE = new PropertyDescriptor.Builder()
    +                .name("Endpoint Override URL")
    +                .description("Endpoint URL to use instead of the AWS default including scheme, host, port, and path. " +
    +                        "The AWS libraries select an endpoint URL based on the AWS region, but this property overrides " +
    +                        "the selected endpoint URL, allowing use with other S3-compatible endpoints.")
    +                .required(false)
    +                .addValidator(StandardValidators.URL_VALIDATOR)
    +                .build();
    +    public static final PropertyDescriptor PROXY_HOST = new PropertyDescriptor.Builder()
    +            .name("Proxy Host")
    +            .description("Proxy host name or IP")
    +            .expressionLanguageSupported(true)
    +            .required(false)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor PROXY_HOST_PORT = new PropertyDescriptor.Builder()
    +            .name("Proxy Host Port")
    +            .description("Proxy host port")
    +            .expressionLanguageSupported(true)
    +            .required(false)
    +            .addValidator(StandardValidators.PORT_VALIDATOR)
    +            .build();
    +
    +    protected volatile Region region;
    +    protected static final Protocol DEFAULT_PROTOCOL = Protocol.HTTPS;
    +    protected static final String DEFAULT_USER_AGENT = "NiFi";
    +
    +    private static AllowableValue createAllowableValue(final Regions regions) {
    +        return new AllowableValue(regions.getName(), regions.getName(), regions.getName());
    +    }
    +
    +    private static AllowableValue[] getAvailableRegions() {
    +        final List<AllowableValue> values = new ArrayList<>();
    +        for (final Regions regions : Regions.values()) {
    +            values.add(createAllowableValue(regions));
    +        }
    +
    +        return (AllowableValue[]) values.toArray(new AllowableValue[values.size()]);
    +    }
    +
    +    public AbstractBaseAWSProcessor() {
    +        super();
    +    }
    +
    +    @OnScheduled
    +    public void onScheduled(final ProcessContext context) {
    +    }
    --- End diff --
    
    Consider removing it since it adds no value. Same of the constructor above.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi issue #239: Nifi 1540 - AWS Kinesis Get and Put Processors

Posted by apiri <gi...@git.apache.org>.
Github user apiri commented on the issue:

    https://github.com/apache/nifi/pull/239
  
    Hey @jvwing & @joewitt 
    
    James, your comment listing the associated JIRA legal issue actually had the nice side effect of getting included in the associated issue.  A response is here: https://issues.apache.org/jira/browse/LEGAL-198?focusedCommentId=15321636&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15321636
    
    Unfortunately, it seems like this falls under Category X and cannot be required for the build (meaning we would not be able to bundle it) from my interpretation of Justin's comment.  Seems we are a bit stuck with regards to NiFi as a default inclusion with our assembly and would likely need help as to how build separately until we are able to get an extension registry that would perhaps provide some additional flexibility.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi issue #239: Nifi 1540 - AWS Kinesis Get and Put Processors

Posted by jvwing <gi...@git.apache.org>.
Github user jvwing commented on the issue:

    https://github.com/apache/nifi/pull/239
  
    @mans2singh, do you plan to continue working on this PR?  If not, would you please close it? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi issue #239: Nifi 1540 - AWS Kinesis Get and Put Processors

Posted by jvwing <gi...@git.apache.org>.
Github user jvwing commented on the issue:

    https://github.com/apache/nifi/pull/239
  
    Thanks for noticing that, I didn't realize it would cross-link.  That's not what I was hoping for.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #239: Nifi 1540 - AWS Kinesis Put Processor

Posted by jvwing <gi...@git.apache.org>.
Github user jvwing commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/239#discussion_r82722085
  
    --- Diff: nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/AbstractKinesisProcessor.java ---
    @@ -0,0 +1,64 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.aws.kinesis.stream;
    +
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.processors.aws.kinesis.AbstractBaseKinesisProcessor;
    +
    +import com.amazonaws.ClientConfiguration;
    +import com.amazonaws.auth.AWSCredentials;
    +import com.amazonaws.auth.AWSCredentialsProvider;
    +import com.amazonaws.services.kinesis.AmazonKinesisClient;
    +
    +/**
    + * This class provides processor the base class for kinesis client
    + */
    +public abstract class AbstractKinesisProcessor extends AbstractBaseKinesisProcessor<AmazonKinesisClient> {
    +
    +    public static final PropertyDescriptor KINESIS_STREAM_NAME = new PropertyDescriptor.Builder()
    +            .name("Amazon Kinesis Stream Name")
    --- End diff --
    
    Needs `name` and `displayName` separation.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi issue #239: Nifi 1540 - AWS Kinesis Put Processor

Posted by simplesteph <gi...@git.apache.org>.
Github user simplesteph commented on the issue:

    https://github.com/apache/nifi/pull/239
  
    Amazing guys congrats! Quick question, are there plans to have a GetKinesis
    processor too? or should we open another JIRA for that?
    
    Kind regards,
    Stephane
    
    [image: Simple Machines]
    
    *Stephane Maarek* | Developer
    
    +61 416 575 980
    stephane@simplemachines.com.au
    simplemachines.com.au
    Level 2, 145 William Street, Sydney NSW 2010
    
    On 12 October 2016 at 3:04:31 PM, mans2singh (notifications@github.com)
    wrote:
    
    @jvwing <https://github.com/jvwing> - Thanks for your
    review/advice/guidance. Mans
    
    \u2014
    You are receiving this because you were mentioned.
    Reply to this email directly, view it on GitHub
    <https://github.com/apache/nifi/pull/239#issuecomment-253113649>, or mute
    the thread
    <https://github.com/notifications/unsubscribe-auth/AT4raUcLBnkIUM3QuknAgNrQe6rAFe79ks5qzFxPgaJpZM4HfAGh>
    .



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #239: Nifi 1540 - AWS Kinesis Put Processor

Posted by jvwing <gi...@git.apache.org>.
Github user jvwing commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/239#discussion_r82722028
  
    --- Diff: nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/AbstractBaseKinesisProcessor.java ---
    @@ -0,0 +1,96 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.aws.kinesis;
    +
    +import java.util.ArrayList;
    +import java.util.List;
    +
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.processors.aws.AbstractAWSCredentialsProviderProcessor;
    +
    +import com.amazonaws.AmazonWebServiceClient;
    +
    +/**
    + * This class provides processor the base class for kinesis client
    + */
    +public abstract class AbstractBaseKinesisProcessor<ClientType extends AmazonWebServiceClient>
    +    extends AbstractAWSCredentialsProviderProcessor<ClientType> {
    +
    +    /**
    +     * Kinesis put record response error message
    +     */
    +    public static final String AWS_KINESIS_ERROR_MESSAGE = "aws.kinesis.error.message";
    +
    +    public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
    +            .name("Batch Size")
    --- End diff --
    
    Needs `name` and `displayName` separation.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: Nifi 1540 - AWS Kinesis Get and Put Processors

Posted by joewitt <gi...@git.apache.org>.
Github user joewitt commented on the pull request:

    https://github.com/apache/nifi/pull/239#issuecomment-214591612
  
    certainly @mans2singh - I went ahead and removed the fix version from the JIRA.  Totally understand and thanks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: Nifi 1540 - AWS Kinesis Get and Put Processors

Posted by mans2singh <gi...@git.apache.org>.
Github user mans2singh commented on the pull request:

    https://github.com/apache/nifi/pull/239
  
    Hi @jvwing - I made the changes you had recommended (added unit tests for get and put kinesis, renamed the get and put processors, added comments about usage of DynamoDB and Cloud Watch), and also rebased the code with current master. 
    
    I also wanted to find out what's your recommendation about handling the case when the session is producing flow file (GetKinesis) and there is a exception while processing it.  Currently, I am just removing the flow file, created and checkpointing to the last flow file successfully processed.
    
    Please let me know if you have any other recommendation.
    
    Thanks
    
    Mans


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi issue #239: Nifi 1540 - AWS Kinesis Get and Put Processors

Posted by mans2singh <gi...@git.apache.org>.
Github user mans2singh commented on the issue:

    https://github.com/apache/nifi/pull/239
  
    Hi @jvwing 
    
    Just wanted to check if you were able to test the processors and if you have any additional review comments/recommendations for me.  
    
    Thanks 
    
    Mans


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi issue #239: Nifi 1540 - AWS Kinesis Get and Put Processors

Posted by mans2singh <gi...@git.apache.org>.
Github user mans2singh commented on the issue:

    https://github.com/apache/nifi/pull/239
  
    Ok - I will add more logging, let me know if there is any other observation you have.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi issue #239: Nifi 1540 - AWS Kinesis Get and Put Processors

Posted by jvwing <gi...@git.apache.org>.
Github user jvwing commented on the issue:

    https://github.com/apache/nifi/pull/239
  
    Thanks for your latest changes to the error handling.  The changes look OK, I don't think we need the failure relationship.  The integration tests for PutKinesisStream and GetKinesisStream both worked fine.  I set up a small flow putting and getting records from a Kinesis stream to test the processors.  The processors do work, but I had a rough experience interrupted by various errors that required a NiFi restart to fix.  Errors include the following, not necessarily in this sequence:
    
    * ERROR [pool-123-thread-4] c.a.s.kinesis.producer.KinesisProducer Error in child process
    java.lang.RuntimeException: EOF reached during read
    * ERROR [pool-37-thread-1] c.a.s.kinesis.producer.KinesisProducer Error in child process
    java.lang.RuntimeException: Child process exited with code 137
    * ERROR [Timer-Driven Process Thread-2] o.a.n.p.a.k.producer.PutKinesisStream
    com.amazonaws.services.kinesis.producer.DaemonException: The child process has been shutdown and can no longer accept messages.
    
    Are you familiar with any of these?  Once the child process errors show up, the PutKinesisStream processor seems to stop working.  I do not have a precise repro sequence yet, but they coincided with throughput around the throttle limit of my Kinesis Stream.  Stopping and starting the processor did not help.  I was running this on an Amazon Linux EC2 instance with permissions for Kinesis, Dynamo, and CloudWatch.  I am not sure how to evaluate if this is a KCL problem or a PutKinesisStream problem.
    
    One suggestion I have for the error handling in PutKinesisStream would be to NOT log the entire batch of FlowFile (PutKinesisStream.java, lines 263, 268, and 272).  For example:
    
    ```
    	if ( failedFlowFiles.size() > 0 ) {
    		session.transfer(failedFlowFiles, PutKinesisStream.REL_FAILURE);
    		getLogger().error("Failed to publish to kinesis {} records {}", new Object[]{stream, failedFlowFiles});
    	}
    ```
    With the default batch size of 250, 250 x FlowFile::toString() adds up to a very large block of text that makes it difficult to find the error.  I'm not sure how helpful the flow file records are.  I certainly recommend putting the exception first, and maybe leaving out the files?  Would a count of files be OK?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi issue #239: Nifi 1540 - AWS Kinesis Put Processor

Posted by mans2singh <gi...@git.apache.org>.
Github user mans2singh commented on the issue:

    https://github.com/apache/nifi/pull/239
  
    @jvwing - Thanks for your review/advice/guidance.  Mans


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #239: Nifi 1540 - AWS Kinesis Put Processor

Posted by simplesteph <gi...@git.apache.org>.
Github user simplesteph commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/239#discussion_r82724343
  
    --- Diff: nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/PutKinesis.java ---
    @@ -0,0 +1,174 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.aws.kinesis.stream;
    +
    +import java.io.ByteArrayOutputStream;
    +import java.nio.ByteBuffer;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Random;
    +
    +import org.apache.commons.lang3.StringUtils;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
    +import org.apache.nifi.annotation.behavior.SupportsBatching;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.DataUnit;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.util.StandardValidators;
    +
    +import com.amazonaws.services.kinesis.AmazonKinesisClient;
    +import com.amazonaws.services.kinesis.model.PutRecordsRequest;
    +import com.amazonaws.services.kinesis.model.PutRecordsRequestEntry;
    +import com.amazonaws.services.kinesis.model.PutRecordsResult;
    +import com.amazonaws.services.kinesis.model.PutRecordsResultEntry;
    +
    +@SupportsBatching
    +@InputRequirement(Requirement.INPUT_REQUIRED)
    +@Tags({"amazon", "aws", "kinesis", "put", "stream"})
    +@CapabilityDescription("Sends the contents to a specified Amazon Kinesis. "
    +    + "In order to send data to Kinesis, the stream name has to be specified.")
    +@WritesAttributes({
    +    @WritesAttribute(attribute = "aws.kinesis.error.message", description = "Error message on posting message to AWS Kinesis"),
    +    @WritesAttribute(attribute = "aws.kinesis.error.code", description = "Error code for the message when posting to AWS Kinesis"),
    +    @WritesAttribute(attribute = "aws.kinesis.sequence.number", description = "Sequence number for the message when posting to AWS Kinesis"),
    +    @WritesAttribute(attribute = "aws.kinesis.shard.id", description = "Shard id of the message posted to AWS Kinesis")})
    +public class PutKinesis extends AbstractKinesisProcessor {
    +
    +    /**
    +     * Kinesis put record response error code
    +     */
    +    public static final String AWS_KINESIS_ERROR_CODE = "aws.kinesis.error.code";
    +
    +    public static final String AWS_KINESIS_SHARD_ID = "aws.kinesis.shard.id";
    +
    +    public static final String AWS_KINESIS_SEQUENCE_NUMBER = "aws.kinesis.sequence.number";
    +
    +    public static final PropertyDescriptor KINESIS_PARTITION_KEY = new PropertyDescriptor.Builder()
    +        .displayName("Amazon Kinesis Stream Partition Key")
    +        .name("amazon-kinesis-stream-partition-key")
    +        .description("The partition key attribute.  If it is not set, a random value is used")
    +        .expressionLanguageSupported(true)
    +        .defaultValue("${kinesis.partition.key}")
    +        .required(false)
    +        .addValidator(StandardValidators.ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR).build();
    +
    +    public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
    +            Arrays.asList(KINESIS_STREAM_NAME, KINESIS_PARTITION_KEY, BATCH_SIZE, MAX_MESSAGE_BUFFER_SIZE_MB, REGION, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE,
    +                AWS_CREDENTIALS_PROVIDER_SERVICE, TIMEOUT, PROXY_HOST,PROXY_HOST_PORT));
    +
    +    /** A random number generator for cases where partition key is not available */
    +    protected Random randomParitionKeyGenerator = new Random();
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return properties;
    +    }
    +
    +    @Override
    +    public void onTrigger(final ProcessContext context, final ProcessSession session) {
    +
    +        final int batchSize = context.getProperty(BATCH_SIZE).asInteger();
    +        final long maxBufferSizeBytes = context.getProperty(MAX_MESSAGE_BUFFER_SIZE_MB).asDataSize(DataUnit.B).longValue();
    +        final String streamName = context.getProperty(KINESIS_STREAM_NAME).getValue();
    +
    +        List<FlowFile> flowFiles = filterMessagesByMaxSize(session, batchSize, maxBufferSizeBytes, streamName,
    +           AWS_KINESIS_ERROR_MESSAGE);
    +
    +        final AmazonKinesisClient client = getClient();
    +
    +        try {
    +            List<PutRecordsRequestEntry> records = new ArrayList<>();
    +
    +            List<FlowFile> failedFlowFiles = new ArrayList<>();
    +            List<FlowFile> successfulFlowFiles = new ArrayList<>();
    +
    +            // Prepare batch of records
    +            for (int i = 0; i < flowFiles.size(); i++) {
    +                FlowFile flowFile = flowFiles.get(i);
    +
    +                final ByteArrayOutputStream baos = new ByteArrayOutputStream();
    +                session.exportTo(flowFile, baos);
    +                PutRecordsRequestEntry record = new PutRecordsRequestEntry().withData(ByteBuffer.wrap(baos.toByteArray()));
    +
    +                String partitionKey = context.getProperty(PutKinesis.KINESIS_PARTITION_KEY)
    +                    .evaluateAttributeExpressions(flowFiles.get(i)).getValue();
    +
    +                if ( ! StringUtils.isBlank(partitionKey) ) {
    +                    record.setPartitionKey(partitionKey);
    +                } else {
    +                    record.setPartitionKey(Integer.toString(randomParitionKeyGenerator.nextInt()));
    +                }
    +
    +                records.add(record);
    +            }
    +
    +            if ( records.size() > 0 ) {
    +
    +                PutRecordsRequest putRecordRequest = new PutRecordsRequest();
    +                putRecordRequest.setStreamName(streamName);
    +                putRecordRequest.setRecords(records);
    +                PutRecordsResult results = client.putRecords(putRecordRequest);
    +
    +                List<PutRecordsResultEntry> responseEntries = results.getRecords();
    +                for (int i = 0; i < responseEntries.size(); i++ ) {
    +                    PutRecordsResultEntry entry = responseEntries.get(i);
    +                    FlowFile flowFile = flowFiles.get(i);
    +
    +                    Map<String,String> attributes = new HashMap<>();
    +                    attributes.put(AWS_KINESIS_SHARD_ID, entry.getShardId());
    +                    attributes.put(AWS_KINESIS_SEQUENCE_NUMBER, entry.getSequenceNumber());
    +
    +                    if ( ! StringUtils.isBlank(entry.getErrorCode()) ) {
    +                        attributes.put(AWS_KINESIS_ERROR_CODE, entry.getErrorCode());
    +                        attributes.put(AWS_KINESIS_ERROR_MESSAGE, entry.getErrorMessage());
    +                        flowFile = session.putAllAttributes(flowFile, attributes);
    +                        failedFlowFiles.add(flowFile);
    +                    } else {
    +                        flowFile = session.putAllAttributes(flowFile, attributes);
    +                        successfulFlowFiles.add(flowFile);
    +                    }
    +                }
    +                if ( failedFlowFiles.size() > 0 ) {
    +                    session.transfer(failedFlowFiles, REL_FAILURE);
    +                    getLogger().error("Failed to publish to kinesis {} records {}", new Object[]{streamName, failedFlowFiles});
    --- End diff --
    
    IMO a good balance is a count and an extract of 10 records (or whatever small number)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi issue #239: Nifi 1540 - AWS Kinesis Get and Put Processors

Posted by jvwing <gi...@git.apache.org>.
Github user jvwing commented on the issue:

    https://github.com/apache/nifi/pull/239
  
    Thanks, @mans2singh, I will review the changes.  I don't think the AppVeyor build is reliable, there don't appear to be any recent successful builds.
    
    For GetKinesisStream exceptions, I think your approach of logging without creating a FlowFile is correct.  I'm not sure we could count on there being anything useful to put in a FlowFile other than the error message, such that the failed FlowFile can be meaningfully processed other than logging.  I did not find any other Get* processor with a failure route.
    
    Have you experienced errors at that stage of Kinesis client processing?  I don't have enough experience with Kinesis Streams to know what bad things are going to happen there, so I don't have clear failure scenarios to think through.  By checkpointing to the last successfully processed record, I believe the 'bad' record will be retried by the KCL.  This seems reasonable, but if NiFi continues to fail on the same record, won't we end up in an infinite retry loop?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi issue #239: Nifi 1540 - AWS Kinesis Get and Put Processors

Posted by mans2singh <gi...@git.apache.org>.
Github user mans2singh commented on the issue:

    https://github.com/apache/nifi/pull/239
  
    @jvwing - 
    
    I've updated the code to catch and log the exception and continue processing kinesis records as you had recommended.  
    
    I can remove REL_FAILURE since it is not being used.  Let me know your thoughts.
    
    Thanks
    
    Mans


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi issue #239: Nifi 1540 - AWS Kinesis Get and Put Processors

Posted by jvwing <gi...@git.apache.org>.
Github user jvwing commented on the issue:

    https://github.com/apache/nifi/pull/239
  
    It appears that my errors were caused by memory constraints on the KPL.  With a larger EC2 instance, I was able to run at the provisioned throughput threshold without the KPL process crashing.  The processors also worked fine through a shard merge.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi issue #239: Nifi 1540 - AWS Kinesis Get and Put Processors

Posted by mans2singh <gi...@git.apache.org>.
Github user mans2singh commented on the issue:

    https://github.com/apache/nifi/pull/239
  
    @jvwing - 
    
    I've added displayName to PropertyDescriptor and updated name based on your feedback.  
    
    I've not tried nifi on aws and mostly run it locally on my laptop that is why I wanted to learn from your experience.
    
    Please let me know if you have any other observations/recommendations.
    
    Looking forward for your feedback.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: Nifi 1540 - AWS Kinesis Get and Put Processors

Posted by mans2singh <gi...@git.apache.org>.
Github user mans2singh commented on the pull request:

    https://github.com/apache/nifi/pull/239#issuecomment-222020291
  
    Hi @jvwing -
    
    Thanks for your feedback.  
    
    Regarding changing the hierarchy to create a `AmazonWebServiceClient` independent base class - I can create separate KPL/KCL based processors but then I will not able to reuse some of the properties, methods available in the Nifi's AWS base classes.  Also, there is are `AmazonWebServiceClient` based Kinesis consumer and producer just like for S3, SNS, SQS, etc but as you've also pointed out we loose the advantages of automatic recovery, batching, monitoring, load balancing across shards etc provided by the KCL/KPL libraries. I also think (this is just MHO), that maybe other event driven services (SQS,SNS, etc) can benefit from the KCL/KPL model. Let me know if using the re-implementing KCL/KPL based processors independent of the current processor (ie, without changing the base classes) is a better choice.
    
    I can change the names, document permissions required for dynamodb, cloudwatch, etc, and add more unit tests as you've recommended.
    
    Please let me know if you have any other suggestions.
    
    Thanks again for your comments/advice.
    
    Mans
    



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: Nifi 1540 - AWS Kinesis Get and Put Processors

Posted by mans2singh <gi...@git.apache.org>.
Github user mans2singh commented on the pull request:

    https://github.com/apache/nifi/pull/239#issuecomment-220644414
  
    Hey @olegz @apiri @jvwing @joewitt - I've tried to address your review comments above.  Please let me know if you have any feedback for me.  Thanks Mans


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi issue #239: Nifi 1540 - AWS Kinesis Get and Put Processors

Posted by mans2singh <gi...@git.apache.org>.
Github user mans2singh commented on the issue:

    https://github.com/apache/nifi/pull/239
  
    @jvwing - 
    I was under the impression that it could be bundled with Nifi due to AWS libraries licence issues.  Please let me know how Kinesis processors can be part of Nifi.
    Thanks


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: Nifi 1540 - AWS Kinesis Get and Put Processors

Posted by olegz <gi...@git.apache.org>.
Github user olegz commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/239#discussion_r55397257
  
    --- Diff: nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/consumer/GetKinesis.java ---
    @@ -0,0 +1,238 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.aws.kinesis.consumer;
    +
    +import java.io.ByteArrayInputStream;
    +import java.net.InetAddress;
    +import java.util.Arrays;
    +import java.util.Collections;
    +import java.util.List;
    +import java.util.UUID;
    +import java.util.concurrent.ExecutorService;
    +import java.util.concurrent.Executors;
    +import java.util.concurrent.TimeUnit;
    +
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
    +import org.apache.nifi.annotation.behavior.SupportsBatching;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.annotation.lifecycle.OnShutdown;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processors.aws.credentials.provider.service.AWSCredentialsProviderService;
    +import org.apache.nifi.util.StopWatch;
    +
    +import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException;
    +import com.amazonaws.services.kinesis.clientlibrary.exceptions.KinesisClientLibDependencyException;
    +import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException;
    +import com.amazonaws.services.kinesis.clientlibrary.exceptions.ThrottlingException;
    +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
    +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration;
    +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker;
    +import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
    +import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
    +import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput;
    +import com.amazonaws.services.kinesis.model.Record;
    +
    +@SupportsBatching
    +@InputRequirement(Requirement.INPUT_FORBIDDEN)
    +@Tags({ "amazon", "aws", "kinesis", "get", "stream" })
    +@CapabilityDescription("Get the records from the specified Amazon Kinesis stream")
    +@WritesAttributes({
    +    @WritesAttribute(attribute = GetKinesis.AWS_KINESIS_CONSUMER_RECORD_APPROX_ARRIVAL_TIMESTAMP, description = "Approximate arrival time of the record"),
    +    @WritesAttribute(attribute = GetKinesis.AWS_KINESIS_CONSUMER_RECORD_PARTITION_KEY, description = "Partition key of the record"),
    +    @WritesAttribute(attribute = GetKinesis.AWS_KINESIS_CONSUMER_RECORD_SEQUENCE_NUMBER, description = "Sequence number of the record"),
    +    @WritesAttribute(attribute = GetKinesis.AWS_KINESIS_CONSUMER_MILLIS_SECONDS_BEHIND, description = "Consumer lag for processing records"),
    +    @WritesAttribute(attribute = GetKinesis.KINESIS_CONSUMER_RECORD_START_TIMESTAMP, description = "Timestamp when the particular batch of records was processed "),
    +    @WritesAttribute(attribute = GetKinesis.KINESIS_CONSUMER_RECORD_NUBMER, description = "Record number of the record processed in that batch")
    +})
    +public class GetKinesis extends AbstractKinesisConsumerProcessor implements RecordsHandler {
    +
    +    /**
    +     * Attributes written by processor
    +     */
    +    public static final String AWS_KINESIS_CONSUMER_RECORD_PARTITION_KEY = "aws.kinesis.consumer.record.partition.key";
    +    public static final String AWS_KINESIS_CONSUMER_RECORD_SEQUENCE_NUMBER = "aws.kinesis.consumer.record.sequence.number";
    +    public static final String AWS_KINESIS_CONSUMER_RECORD_APPROX_ARRIVAL_TIMESTAMP = "aws.kinesis.consumer.record.approx.arrival.timestamp";
    +    public static final String AWS_KINESIS_CONSUMER_MILLIS_SECONDS_BEHIND = "aws.kinesis.consumer.record.milli.seconds.behind";
    +    public static final String KINESIS_CONSUMER_RECORD_START_TIMESTAMP = "kinesis.consumer.record.start.timestamp";
    +    public static final String KINESIS_CONSUMER_RECORD_NUBMER = "kinesis.consumer.record.number";
    +
    +    public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(Arrays.asList(REGION,
    +            AWS_CREDENTIALS_PROVIDER_SERVICE, KINESIS_STREAM_NAME, KINESIS_CONSUMER_APPLICATION_NAME,
    +            KINESIS_CONSUMER_WORKER_ID_PREFIX, BATCH_SIZE, KINESIS_CONSUMER_INITIAL_POSITION_IN_STREAM,
    +            KINESIS_CONSUMER_DEFAULT_FAILOVER_TIME_MILLIS, KINESIS_CONSUMER_DEFAULT_MAX_RECORDS,
    +            KINESIS_CONSUMER_DEFAULT_IDLETIME_BETWEEN_READS_MILLIS, KINESIS_CONSUMER_DEFAULT_DONT_CALL_PROCESS_RECORDS_FOR_EMPTY_RECORD_LIST,
    +            KINESIS_CONSUMER_DEFAULT_PARENT_SHARD_POLL_INTERVAL_MILLIS, KINESIS_CONSUMER_DEFAULT_SHARD_SYNC_INTERVAL_MILLIS,
    +            KINESIS_CONSUMER_DEFAULT_CLEANUP_LEASES_UPON_SHARDS_COMPLETION,
    +            KINESIS_CONSUMER_DEFAULT_TASK_BACKOFF_TIME_MILLIS, KINESIS_CONSUMER_DEFAULT_METRICS_BUFFER_TIME_MILLIS,
    +            KINESIS_CONSUMER_DEFAULT_METRICS_MAX_QUEUE_SIZE, KINESIS_CONSUMER_DEFAULT_METRICS_LEVEL));
    +
    +    protected Worker consumerWorker;
    +
    +    protected int batchSize;
    +    protected ExecutorService executor = Executors.newCachedThreadPool();
    +
    +    private String streamName;
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return properties;
    +    }
    +
    +    @OnScheduled
    +    public void onScheduled(final ProcessContext context) throws ProcessException {
    +
    +        final AWSCredentialsProviderService awsCredentialsProviderService = context
    +                .getProperty(GetKinesis.AWS_CREDENTIALS_PROVIDER_SERVICE)
    +                .asControllerService(AWSCredentialsProviderService.class);
    +
    +        streamName = context.getProperty(GetKinesis.KINESIS_STREAM_NAME).getValue();
    +        KinesisClientLibConfiguration config;
    +
    +        try {
    +            config = new KinesisClientLibConfiguration(
    +                context.getProperty(GetKinesis.KINESIS_CONSUMER_APPLICATION_NAME).getValue(),
    +                context.getProperty(GetKinesis.KINESIS_STREAM_NAME).getValue(),
    +                awsCredentialsProviderService.getCredentialsProvider(),
    +                context.getProperty(GetKinesis.KINESIS_CONSUMER_WORKER_ID_PREFIX).getValue()+ ":"
    +                    + InetAddress.getLocalHost().getCanonicalHostName() + ":" + UUID.randomUUID().toString())
    +                    .withRegionName(context.getProperty(GetKinesis.REGION).getValue())
    +                    .withInitialPositionInStream(
    +                         InitialPositionInStream.valueOf(
    +                             context.getProperty(KINESIS_CONSUMER_INITIAL_POSITION_IN_STREAM).getValue()))
    +                    .withFailoverTimeMillis(
    +                             context.getProperty(KINESIS_CONSUMER_DEFAULT_FAILOVER_TIME_MILLIS).asLong())
    +                    .withMaxRecords(context.getProperty(KINESIS_CONSUMER_DEFAULT_MAX_RECORDS).asInteger())
    +                    .withIdleTimeBetweenReadsInMillis(context.getProperty(KINESIS_CONSUMER_DEFAULT_IDLETIME_BETWEEN_READS_MILLIS).asLong())
    +                    .withCallProcessRecordsEvenForEmptyRecordList(!context.getProperty(KINESIS_CONSUMER_DEFAULT_DONT_CALL_PROCESS_RECORDS_FOR_EMPTY_RECORD_LIST).asBoolean())
    +                    .withParentShardPollIntervalMillis(context.getProperty(KINESIS_CONSUMER_DEFAULT_PARENT_SHARD_POLL_INTERVAL_MILLIS).asLong())
    +                    .withShardSyncIntervalMillis(context.getProperty(KINESIS_CONSUMER_DEFAULT_SHARD_SYNC_INTERVAL_MILLIS).asLong())
    +                    .withCleanupLeasesUponShardCompletion(context.getProperty(KINESIS_CONSUMER_DEFAULT_CLEANUP_LEASES_UPON_SHARDS_COMPLETION).asBoolean())
    +                    .withTaskBackoffTimeMillis(context.getProperty(KINESIS_CONSUMER_DEFAULT_TASK_BACKOFF_TIME_MILLIS).asLong())
    +                    .withMetricsBufferTimeMillis(context.getProperty(KINESIS_CONSUMER_DEFAULT_METRICS_BUFFER_TIME_MILLIS).asLong())
    +                    .withMetricsMaxQueueSize(context.getProperty(KINESIS_CONSUMER_DEFAULT_METRICS_MAX_QUEUE_SIZE).asInteger())
    +                    .withMetricsLevel(context.getProperty(KINESIS_CONSUMER_DEFAULT_METRICS_LEVEL).getValue());
    +
    +            batchSize = context.getProperty(GetKinesis.BATCH_SIZE).asInteger();
    +            config.withMaxRecords(batchSize);
    +
    +            KinesisRecordProcessorFactory kinesisRecordProcessorFactory = new KinesisRecordProcessorFactory(this);
    +            consumerWorker = new Worker
    +                .Builder()
    +                .recordProcessorFactory(kinesisRecordProcessorFactory)
    +                .config(config)
    +                .build();
    +
    +            executor.execute(consumerWorker);
    +        } catch (Exception e) {
    +            throw new ProcessException(e);
    +        }
    +    }
    +
    +    @OnShutdown
    +    public void onShutdown() {
    +        if (consumerWorker != null)
    +            consumerWorker.shutdown();
    +        executor.shutdownNow();
    --- End diff --
    
    If the above line results in exception the executer will never be shut down. As a result NiFi will never get a chance to exit gracefully. It already happened in few processors that were using custom thread pool.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: Nifi 1540 - AWS Kinesis Get and Put Processors

Posted by olegz <gi...@git.apache.org>.
Github user olegz commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/239#discussion_r55396633
  
    --- Diff: nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/AbstractBaseAWSProcessor.java ---
    @@ -0,0 +1,211 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.aws;
    +
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Set;
    +
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.AllowableValue;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.ValidationContext;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.ProcessSessionFactory;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.processors.aws.credentials.provider.service.AWSCredentialsProviderService;
    +import org.apache.nifi.ssl.SSLContextService;
    +
    +import com.amazonaws.Protocol;
    +import com.amazonaws.regions.Region;
    +import com.amazonaws.regions.Regions;
    +
    +/**
    + * This is a base class of Nifi AWS Processors.  This class contains basic property descriptors, AWS credentials and relationships and
    + * is not dependent on AmazonWebServiceClient classes.  It's subclasses add support for interacting with AWS respective
    + * clients.
    + *
    + * @see AbstractAWSCredentialsProviderProcessor
    + * @see AbstractAWSProcessor
    + * @see AmazonWebServiceClient
    + */
    +public abstract class AbstractBaseAWSProcessor extends AbstractSessionFactoryProcessor {
    +
    +    public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success")
    +                .description("FlowFiles are routed to success relationship").build();
    +    public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure")
    +                .description("FlowFiles are routed to failure relationship").build();
    +    public static final Set<Relationship> relationships = Collections.unmodifiableSet(
    +                new HashSet<>(Arrays.asList(REL_SUCCESS, REL_FAILURE)));
    +    /**
    +     * AWS credentials provider service
    +     *
    +     * @see  <a href="http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/AWSCredentialsProvider.html">AWSCredentialsProvider</a>
    +     */
    +    public static final PropertyDescriptor AWS_CREDENTIALS_PROVIDER_SERVICE = new PropertyDescriptor.Builder()
    +            .name("AWS Credentials Provider service")
    +            .description("The Controller Service that is used to obtain aws credentials provider")
    +            .required(false)
    +            .identifiesControllerService(AWSCredentialsProviderService.class)
    +            .build();
    +
    +    public static final PropertyDescriptor CREDENTIALS_FILE = new PropertyDescriptor.Builder()
    +                .name("Credentials File")
    +                .expressionLanguageSupported(false)
    +                .required(false)
    +                .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR)
    +                .build();
    +    public static final PropertyDescriptor ACCESS_KEY = new PropertyDescriptor.Builder()
    +                .name("Access Key")
    +                .expressionLanguageSupported(true)
    +                .required(false)
    +                .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +                .sensitive(true)
    +                .build();
    +    public static final PropertyDescriptor SECRET_KEY = new PropertyDescriptor.Builder()
    +                .name("Secret Key")
    +                .expressionLanguageSupported(true)
    +                .required(false)
    +                .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +                .sensitive(true)
    +                .build();
    +    public static final PropertyDescriptor REGION = new PropertyDescriptor.Builder()
    +                .name("Region")
    +                .required(true)
    +                .allowableValues(getAvailableRegions())
    +                .defaultValue(createAllowableValue(Regions.DEFAULT_REGION).getValue())
    +                .build();
    +    public static final PropertyDescriptor TIMEOUT = new PropertyDescriptor.Builder()
    +                .name("Communications Timeout")
    +                .required(true)
    +                .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
    +                .defaultValue("30 secs")
    +                .build();
    +    public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
    +                .name("SSL Context Service")
    +                .description("Specifies an optional SSL Context Service that, if provided, will be used to create connections")
    +                .required(false)
    +                .identifiesControllerService(SSLContextService.class)
    +                .build();
    +    public static final PropertyDescriptor ENDPOINT_OVERRIDE = new PropertyDescriptor.Builder()
    +                .name("Endpoint Override URL")
    +                .description("Endpoint URL to use instead of the AWS default including scheme, host, port, and path. " +
    +                        "The AWS libraries select an endpoint URL based on the AWS region, but this property overrides " +
    +                        "the selected endpoint URL, allowing use with other S3-compatible endpoints.")
    +                .required(false)
    +                .addValidator(StandardValidators.URL_VALIDATOR)
    +                .build();
    +    public static final PropertyDescriptor PROXY_HOST = new PropertyDescriptor.Builder()
    +            .name("Proxy Host")
    +            .description("Proxy host name or IP")
    +            .expressionLanguageSupported(true)
    +            .required(false)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor PROXY_HOST_PORT = new PropertyDescriptor.Builder()
    +            .name("Proxy Host Port")
    +            .description("Proxy host port")
    +            .expressionLanguageSupported(true)
    +            .required(false)
    +            .addValidator(StandardValidators.PORT_VALIDATOR)
    +            .build();
    +
    +    protected volatile Region region;
    +    protected static final Protocol DEFAULT_PROTOCOL = Protocol.HTTPS;
    +    protected static final String DEFAULT_USER_AGENT = "NiFi";
    +
    +    private static AllowableValue createAllowableValue(final Regions regions) {
    +        return new AllowableValue(regions.getName(), regions.getName(), regions.getName());
    +    }
    +
    +    private static AllowableValue[] getAvailableRegions() {
    +        final List<AllowableValue> values = new ArrayList<>();
    +        for (final Regions regions : Regions.values()) {
    +            values.add(createAllowableValue(regions));
    +        }
    +
    +        return (AllowableValue[]) values.toArray(new AllowableValue[values.size()]);
    +    }
    +
    +    public AbstractBaseAWSProcessor() {
    +        super();
    +    }
    +
    +    @OnScheduled
    +    public void onScheduled(final ProcessContext context) {
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return relationships;
    +    }
    +
    +    @Override
    +    public void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) throws ProcessException {
    +        final ProcessSession session = sessionFactory.createSession();
    +        try {
    +            onTrigger(context, session);
    +            session.commit();
    +        } catch (final Throwable t) {
    +            getLogger().error("{} failed to process due to {}; rolling back session", new Object[]{this, t});
    +            session.rollback(true);
    +            throw t;
    +        }
    +    }
    --- End diff --
    
    Consider sub-classing from AbstractProcessor, since the above code is already provide by it


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi issue #239: Nifi 1540 - AWS Kinesis Get and Put Processors

Posted by simplesteph <gi...@git.apache.org>.
Github user simplesteph commented on the issue:

    https://github.com/apache/nifi/pull/239
  
    +1 on separate nar for now. In the short term that's how users (like me) may benefit from it! Should be a quick win as well 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: Nifi 1540 - AWS Kinesis Get and Put Processors

Posted by mans2singh <gi...@git.apache.org>.
Github user mans2singh commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/239#discussion_r63047926
  
    --- Diff: nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/consumer/GetKinesis.java ---
    @@ -0,0 +1,238 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.aws.kinesis.consumer;
    +
    +import java.io.ByteArrayInputStream;
    +import java.net.InetAddress;
    +import java.util.Arrays;
    +import java.util.Collections;
    +import java.util.List;
    +import java.util.UUID;
    +import java.util.concurrent.ExecutorService;
    +import java.util.concurrent.Executors;
    +import java.util.concurrent.TimeUnit;
    +
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
    +import org.apache.nifi.annotation.behavior.SupportsBatching;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.annotation.lifecycle.OnShutdown;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processors.aws.credentials.provider.service.AWSCredentialsProviderService;
    +import org.apache.nifi.util.StopWatch;
    +
    +import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException;
    +import com.amazonaws.services.kinesis.clientlibrary.exceptions.KinesisClientLibDependencyException;
    +import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException;
    +import com.amazonaws.services.kinesis.clientlibrary.exceptions.ThrottlingException;
    +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
    +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration;
    +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker;
    +import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
    +import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
    +import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput;
    +import com.amazonaws.services.kinesis.model.Record;
    +
    +@SupportsBatching
    +@InputRequirement(Requirement.INPUT_FORBIDDEN)
    +@Tags({ "amazon", "aws", "kinesis", "get", "stream" })
    +@CapabilityDescription("Get the records from the specified Amazon Kinesis stream")
    +@WritesAttributes({
    +    @WritesAttribute(attribute = GetKinesis.AWS_KINESIS_CONSUMER_RECORD_APPROX_ARRIVAL_TIMESTAMP, description = "Approximate arrival time of the record"),
    +    @WritesAttribute(attribute = GetKinesis.AWS_KINESIS_CONSUMER_RECORD_PARTITION_KEY, description = "Partition key of the record"),
    +    @WritesAttribute(attribute = GetKinesis.AWS_KINESIS_CONSUMER_RECORD_SEQUENCE_NUMBER, description = "Sequence number of the record"),
    +    @WritesAttribute(attribute = GetKinesis.AWS_KINESIS_CONSUMER_MILLIS_SECONDS_BEHIND, description = "Consumer lag for processing records"),
    +    @WritesAttribute(attribute = GetKinesis.KINESIS_CONSUMER_RECORD_START_TIMESTAMP, description = "Timestamp when the particular batch of records was processed "),
    +    @WritesAttribute(attribute = GetKinesis.KINESIS_CONSUMER_RECORD_NUBMER, description = "Record number of the record processed in that batch")
    +})
    +public class GetKinesis extends AbstractKinesisConsumerProcessor implements RecordsHandler {
    +
    +    /**
    +     * Attributes written by processor
    +     */
    +    public static final String AWS_KINESIS_CONSUMER_RECORD_PARTITION_KEY = "aws.kinesis.consumer.record.partition.key";
    +    public static final String AWS_KINESIS_CONSUMER_RECORD_SEQUENCE_NUMBER = "aws.kinesis.consumer.record.sequence.number";
    +    public static final String AWS_KINESIS_CONSUMER_RECORD_APPROX_ARRIVAL_TIMESTAMP = "aws.kinesis.consumer.record.approx.arrival.timestamp";
    +    public static final String AWS_KINESIS_CONSUMER_MILLIS_SECONDS_BEHIND = "aws.kinesis.consumer.record.milli.seconds.behind";
    +    public static final String KINESIS_CONSUMER_RECORD_START_TIMESTAMP = "kinesis.consumer.record.start.timestamp";
    +    public static final String KINESIS_CONSUMER_RECORD_NUBMER = "kinesis.consumer.record.number";
    +
    +    public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(Arrays.asList(REGION,
    +            AWS_CREDENTIALS_PROVIDER_SERVICE, KINESIS_STREAM_NAME, KINESIS_CONSUMER_APPLICATION_NAME,
    +            KINESIS_CONSUMER_WORKER_ID_PREFIX, BATCH_SIZE, KINESIS_CONSUMER_INITIAL_POSITION_IN_STREAM,
    +            KINESIS_CONSUMER_DEFAULT_FAILOVER_TIME_MILLIS, KINESIS_CONSUMER_DEFAULT_MAX_RECORDS,
    +            KINESIS_CONSUMER_DEFAULT_IDLETIME_BETWEEN_READS_MILLIS, KINESIS_CONSUMER_DEFAULT_DONT_CALL_PROCESS_RECORDS_FOR_EMPTY_RECORD_LIST,
    +            KINESIS_CONSUMER_DEFAULT_PARENT_SHARD_POLL_INTERVAL_MILLIS, KINESIS_CONSUMER_DEFAULT_SHARD_SYNC_INTERVAL_MILLIS,
    +            KINESIS_CONSUMER_DEFAULT_CLEANUP_LEASES_UPON_SHARDS_COMPLETION,
    +            KINESIS_CONSUMER_DEFAULT_TASK_BACKOFF_TIME_MILLIS, KINESIS_CONSUMER_DEFAULT_METRICS_BUFFER_TIME_MILLIS,
    +            KINESIS_CONSUMER_DEFAULT_METRICS_MAX_QUEUE_SIZE, KINESIS_CONSUMER_DEFAULT_METRICS_LEVEL));
    +
    +    protected Worker consumerWorker;
    +
    +    protected int batchSize;
    +    protected ExecutorService executor = Executors.newCachedThreadPool();
    +
    +    private String streamName;
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return properties;
    +    }
    +
    +    @OnScheduled
    +    public void onScheduled(final ProcessContext context) throws ProcessException {
    +
    +        final AWSCredentialsProviderService awsCredentialsProviderService = context
    +                .getProperty(GetKinesis.AWS_CREDENTIALS_PROVIDER_SERVICE)
    +                .asControllerService(AWSCredentialsProviderService.class);
    +
    +        streamName = context.getProperty(GetKinesis.KINESIS_STREAM_NAME).getValue();
    +        KinesisClientLibConfiguration config;
    +
    +        try {
    +            config = new KinesisClientLibConfiguration(
    +                context.getProperty(GetKinesis.KINESIS_CONSUMER_APPLICATION_NAME).getValue(),
    +                context.getProperty(GetKinesis.KINESIS_STREAM_NAME).getValue(),
    +                awsCredentialsProviderService.getCredentialsProvider(),
    +                context.getProperty(GetKinesis.KINESIS_CONSUMER_WORKER_ID_PREFIX).getValue()+ ":"
    +                    + InetAddress.getLocalHost().getCanonicalHostName() + ":" + UUID.randomUUID().toString())
    +                    .withRegionName(context.getProperty(GetKinesis.REGION).getValue())
    +                    .withInitialPositionInStream(
    +                         InitialPositionInStream.valueOf(
    +                             context.getProperty(KINESIS_CONSUMER_INITIAL_POSITION_IN_STREAM).getValue()))
    +                    .withFailoverTimeMillis(
    +                             context.getProperty(KINESIS_CONSUMER_DEFAULT_FAILOVER_TIME_MILLIS).asLong())
    +                    .withMaxRecords(context.getProperty(KINESIS_CONSUMER_DEFAULT_MAX_RECORDS).asInteger())
    +                    .withIdleTimeBetweenReadsInMillis(context.getProperty(KINESIS_CONSUMER_DEFAULT_IDLETIME_BETWEEN_READS_MILLIS).asLong())
    +                    .withCallProcessRecordsEvenForEmptyRecordList(!context.getProperty(KINESIS_CONSUMER_DEFAULT_DONT_CALL_PROCESS_RECORDS_FOR_EMPTY_RECORD_LIST).asBoolean())
    +                    .withParentShardPollIntervalMillis(context.getProperty(KINESIS_CONSUMER_DEFAULT_PARENT_SHARD_POLL_INTERVAL_MILLIS).asLong())
    +                    .withShardSyncIntervalMillis(context.getProperty(KINESIS_CONSUMER_DEFAULT_SHARD_SYNC_INTERVAL_MILLIS).asLong())
    +                    .withCleanupLeasesUponShardCompletion(context.getProperty(KINESIS_CONSUMER_DEFAULT_CLEANUP_LEASES_UPON_SHARDS_COMPLETION).asBoolean())
    +                    .withTaskBackoffTimeMillis(context.getProperty(KINESIS_CONSUMER_DEFAULT_TASK_BACKOFF_TIME_MILLIS).asLong())
    +                    .withMetricsBufferTimeMillis(context.getProperty(KINESIS_CONSUMER_DEFAULT_METRICS_BUFFER_TIME_MILLIS).asLong())
    +                    .withMetricsMaxQueueSize(context.getProperty(KINESIS_CONSUMER_DEFAULT_METRICS_MAX_QUEUE_SIZE).asInteger())
    +                    .withMetricsLevel(context.getProperty(KINESIS_CONSUMER_DEFAULT_METRICS_LEVEL).getValue());
    +
    +            batchSize = context.getProperty(GetKinesis.BATCH_SIZE).asInteger();
    +            config.withMaxRecords(batchSize);
    +
    +            KinesisRecordProcessorFactory kinesisRecordProcessorFactory = new KinesisRecordProcessorFactory(this);
    +            consumerWorker = new Worker
    +                .Builder()
    +                .recordProcessorFactory(kinesisRecordProcessorFactory)
    +                .config(config)
    +                .build();
    +
    +            executor.execute(consumerWorker);
    +        } catch (Exception e) {
    +            throw new ProcessException(e);
    +        }
    +    }
    +
    +    @OnShutdown
    +    public void onShutdown() {
    +        if (consumerWorker != null)
    +            consumerWorker.shutdown();
    +        executor.shutdownNow();
    --- End diff --
    
    @olegz  - The consumerWorker.shutdown() only sets a flag and does not throw an exception.  If I've missed anything, please advice.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

Re: [GitHub] nifi issue #239: Nifi 1540 - AWS Kinesis Get and Put Processors

Posted by Joe Witt <jo...@gmail.com>.
Ahh just saw this.  Will take a look into licensing.
On Jun 8, 2016 2:31 PM, "jvwing" <gi...@git.apache.org> wrote:

> Github user jvwing commented on the issue:
>
>     https://github.com/apache/nifi/pull/239
>
>     @joewitt, would you please help us with the licensing/notice
> requirements for using the Kinesis Client Library and Kinesis Producer
> Library?  The Kinesis libraries are licensed under the [Amazon Software
> License](https://aws.amazon.com/asl/).  This does not appear on the
> published [list of Apache-compatible licenses](
> http://www.apache.org/legal/resolved.html#category-a).
>
>     The Apache Spark project includes comparable use of the Kinesis
> library, although they have chosen to [present their Kinesis integration as
> an optional add-on](
> http://spark.apache.org/docs/latest/streaming-kinesis-integration.html).
> Comparable code is in fact [checked into the Spark repo](
> https://github.com/apache/spark/tree/master/external), but I did not find
> mention of the ASL in a NOTICE file. I was really hoping to copy and
> paste.  I found a [JIRA issue raised by the Spark team for the license
> discussion](https://issues.apache.org/jira/browse/LEGAL-198) which
> discusses the add-on nature of the component, but not specific referencing
> language.
>
>     Is this OK?  How can we determine what needs to be added to the NOTICE
> file in nifi-aws-nar?
>
>
>
> ---
> If your project is set up for it, you can reply to this email and have your
> reply appear on GitHub as well. If your project does not have this feature
> enabled and wishes so, or if the feature is enabled but not working, please
> contact infrastructure at infrastructure@apache.org or file a JIRA ticket
> with INFRA.
> ---
>

[GitHub] nifi issue #239: Nifi 1540 - AWS Kinesis Get and Put Processors

Posted by jvwing <gi...@git.apache.org>.
Github user jvwing commented on the issue:

    https://github.com/apache/nifi/pull/239
  
    @joewitt, would you please help us with the licensing/notice requirements for using the Kinesis Client Library and Kinesis Producer Library?  The Kinesis libraries are licensed under the [Amazon Software License](https://aws.amazon.com/asl/).  This does not appear on the published [list of Apache-compatible licenses](http://www.apache.org/legal/resolved.html#category-a).
    
    The Apache Spark project includes comparable use of the Kinesis library, although they have chosen to [present their Kinesis integration as an optional add-on](http://spark.apache.org/docs/latest/streaming-kinesis-integration.html).  Comparable code is in fact [checked into the Spark repo](https://github.com/apache/spark/tree/master/external), but I did not find mention of the ASL in a NOTICE file. I was really hoping to copy and paste.  I found a [JIRA issue raised by the Spark team for the license discussion](https://issues.apache.org/jira/browse/LEGAL-198) which discusses the add-on nature of the component, but not specific referencing language.
    
    Is this OK?  How can we determine what needs to be added to the NOTICE file in nifi-aws-nar?



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #239: Nifi 1540 - AWS Kinesis Put Processor

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/nifi/pull/239


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi issue #239: Nifi 1540 - AWS Kinesis Get and Put Processors

Posted by mans2singh <gi...@git.apache.org>.
Github user mans2singh commented on the issue:

    https://github.com/apache/nifi/pull/239
  
    Hi @jvwing:  
    
    Regarding exception handling - Kinesis will automatically handle retrying if there is kinesis client error from the last checkpoint created in the GetKinesisStream.  
    
    In case of exceptions while handling a record inside the GetKinesisStream (line 220)
    
    {code:GetKinesisStream.java}
    catch (Exception e) {
               if ( flowFile != null ) {
                    session.remove(flowFile);
                }
                getLogger().error("Error while handling record: " + e.getMessage());
     } 
    {code}
    
    I was thinking instead of just removing the FlowFile, I should pass the flowFile to failure relationship, and include it among the checkpoint record so that user can handle it separately and we won't get into a loop where the bad flow file will be reprocessed.
    
    Let me know what you recommend.
    
    Thanks
    
    Mans
    
    
    



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi issue #239: Nifi 1540 - AWS Kinesis Get and Put Processors

Posted by jvwing <gi...@git.apache.org>.
Github user jvwing commented on the issue:

    https://github.com/apache/nifi/pull/239
  
    Checkpointing after catching an exception would keep things moving, I can see the benefit there.  But what is the advantage of creating a failed FlowFile over just logging the exception?  What would a user do with the FlowFile?  Also, given that we had thrown an exception creating a FlowFile on the happy path, how do we safely generate a FlowFile for the failure route?
    
    I recommend catching, logging, and checkpointing.  It may be worth trying to log a bit more detail, like sequence number, partition, etc., in case those are not included in the exception message.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi issue #239: Nifi 1540 - AWS Kinesis Get and Put Processors

Posted by mans2singh <gi...@git.apache.org>.
Github user mans2singh commented on the issue:

    https://github.com/apache/nifi/pull/239
  
    Hey @jvwing 
    
    I've made the property name changes you recommended and corrected maxBufferedTimeInterval - it was indeed millis and values were different from what I had.  I've also added allowable values from boolean where I missed them.
    
    Regarding units (using KB instead of bytes and seconds instead of millis) - I found that some values were not exactly in kb or seconds (eg: in producer max buffered interval case there was 100-9223372036854775807 millis value). This made creates a one to one mapping to AWS code.  But let me know what you recommend.
    
    I am not quite sure of the your #1 comment (Use name as a key and displayName for the UI text, rather than just name.).  Let me know what you recommend and I will work on it.
    
    Regarding property - KINESIS_PRODUCER_FAIL_IF_THROTTLED - If set to true, the producer will not retry in case of throttling error.  KINESIS_CONSUMER_DEFAULT_DONT_CALL_PROCESS_RECORDS_FOR_EMPTY_RECORD_LIST - will avoid calling the producer if the records list is empty perhaps for performance reasons.  These properties were available and I wanted to make sure they are all exposed so that the client can use them at their discretion.
    
    Regarding the issue of producer errors, these are known issues and I am including links from what I could find about them:
    #1 - EOF file issue - https://github.com/awslabs/amazon-kinesis-producer/issues/46
    #2 - Code 137 is a fatal error signal 
    #3 - Child process has been shutdown is also producer issue - https://github.com/awslabs/amazon-kinesis-producer/issues/39
    
    If you have more details/stacktrace about the above, let me know and I will try to investigate it further.
    
    Please let me know if I have missed anything.
    
    Thanks again for your detailed review/comments.
    



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: Nifi 1540 - AWS Kinesis Get and Put Processors

Posted by jvwing <gi...@git.apache.org>.
Github user jvwing commented on the pull request:

    https://github.com/apache/nifi/pull/239#issuecomment-221619193
  
    @mans2singh, I'm skeptical about the need to change the class hierarchy for all AWS processors.  I understand you want to share a common base class for the Kinesis processors, and use shared AWS code for credentials, property validation, etc.  I also see that AbstractProcessor's functionality is not particularly hard to replicate right now.  But that might change in the future, and most of the AWS processors would benefit from common functionality and compliance without benefiting from the customization.
    
    These Kinesis processors seem more of an exception to the rule rather than an indicator of the common needs of AWS processors.  As you point out above, the Kinesis producer/consumer processors use a different set of AWS libraries, running an out-of-process native code module, and driven by different concurrency and flow control concerns.  I don't believe these requirements will be shared by any other AWS processors on the near horizon.
    
    I don't have any big concerns about your implementation of AbstractBaseAWSProcessor, it appears OK.  Our AWS processor class hierarchy is already in need of some repairs, and this could be made to fit.  But I'm not sure that should be done for this PR, driven by the Kinesis processors.
    
    A few other comments:
    
    - I recommend renaming the processors something like "GetKinesisStream" and "PutKinesisStream", to distinguish them from PutKinesisFirehose and possible future Kinesis processors for their analytics product.
    - We should document the AWS permission requirements, at least a link to the AWS docs on the permissions required by the KCL/KPL (Kinesis, DynamoDB, and CloudWatch?).
    - There does not appear to be a lot of unit test coverage of key onTrigger methods and flowfile processing.
    
    I am still working on running the integration tests and doing more detailed code review.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi issue #239: Nifi 1540 - AWS Kinesis Put Processor

Posted by miquillo <gi...@git.apache.org>.
Github user miquillo commented on the issue:

    https://github.com/apache/nifi/pull/239
  
    ouch, that's painful... we've just committed very similar code in NIFI-1769 (PR #362, see reference), since we required a kinesis streams processor block. 
    
    I propose to take this solution further though, as it seems more mature and already has been reviewed a few times. 
    
    My comments:
    
    - At the time of reading, the name of the issue / PR didn't make us realize enough that this processor block was already build. Kinesis is the Umbrella, which currently contains three services: Firehose, Streams and Analytics. This processor block only works for Kinesis Streams, Please rename both code and PR/Jira :) (+ also the tests under a subfolder 'streams')
    - The API explains some restrictions to PutRecords(): 
    
    > Each PutRecords request can support up to 500 records. Each record in the request can be as large as 1 MB, up to a limit of 5 MB for the entire request, including partition keys. Each shard can support writes up to 1,000 records per second, up to a maximum data write total of 1 MB per second.[http://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecords.html] 
    
    Data needs to be chunked into multiple PutRecords calls if the amount of records > 500. We used the following:
    `List<PutRecordBatchResponseEntry> recordChuncks = Lists.partition(records, 500);`
    - We added a NR_SHARDS parameter for the sake of resharding (scaling up/down) in the future. Although we didn't implement a resharding mechanism, it's perhaps worth considering.
    
    



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi issue #239: Nifi 1540 - AWS Kinesis Put Processor

Posted by jvwing <gi...@git.apache.org>.
Github user jvwing commented on the issue:

    https://github.com/apache/nifi/pull/239
  
    @simplesteph - Please open a new ticket for a GetKinesis processor, I'll resolve the current ticket.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi issue #239: Nifi 1540 - AWS Kinesis Get and Put Processors

Posted by mans2singh <gi...@git.apache.org>.
Github user mans2singh commented on the issue:

    https://github.com/apache/nifi/pull/239
  
    @jvwing - I will rework at least the put processor using the AWS standard SDK so that it can be bundled with Nifi nars.  Let me know if there is any other concern/recommendation.
    Thanks again.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi issue #239: Nifi 1540 - AWS Kinesis Put Processor

Posted by mans2singh <gi...@git.apache.org>.
Github user mans2singh commented on the issue:

    https://github.com/apache/nifi/pull/239
  
    @jvwing, @miquillo , @simplesteph - Thanks for your feedback and comments.  
    
    I've updated to code with the following:
    
    1. Moved tests to stream package
    2. Added displayName and name to properties
    3. Changed logging level for successful flow files to debug (to reduce logging output)
    4. Added comments on stream provisioning for integration tests  - I used 10 shards
    5. Moved creation of runner, setting creds file, and stream name to setup in IT tests.
    
    Regarding logging for successful message processing - I agree that info level logging for all events is a bit verbose.  In the updated code, I've set the level to debug so that user may choose to get details as they deem necessary.  IMHO - it's better to give the user complete logs, with the option to turn it off, rather than a fixed number, which could cause confusion if they are trying to trace their events.  Once, users are satisfied with tracing, they can turn logging level to info to reduce logs.
    
    Regarding logging for failed flow files - IMHO, if we fix the output size of flow files in the logs, it might not give the complete picture to the user.  Most of the errors (I believe) will be resolved during configuration and setup and during runtime, there might be a few errors so there might not be lots of flow files in the logs.
    
    However, in case the consensus is to reduce the flowfiles to a fixed number (say 10), in successful or failure cases, let me know and I will work on it.  I was just trying to keep things simple and complete.
    
    Regarding packages and names recommendation - The PutKinesis processor was in stream package but the tests were not, so I've moved them to stream package.  If the consensus is to rename the put processor to PutKinesisStream, let me know and I will update it.
    
    Thanks again for your feedback and let me know your thoughts.
    
    Mans


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi issue #239: Nifi 1540 - AWS Kinesis Get and Put Processors

Posted by mans2singh <gi...@git.apache.org>.
Github user mans2singh commented on the issue:

    https://github.com/apache/nifi/pull/239
  
    I also recommend some modifications to the property definitions in both AbstractKinesisConsumer and AbstractKinesisProducer:
    
    * Use `name` as a key and `displayName` for the UI text, rather than just `name`.  This is a recommended best practice that has come up recently in other PRs.
    * Consider using NiFi notation for data sizes like "1 KB" instead of a long count of bytes.  Some properties include KINESIS_PRODUCER_AGGREGATION_MAX_SIZE, KINESIS_PRODUCER_COLLECTION_MAX_SIZE.
    * Consider using NiFi notation for time span inputs like "1 min" instead of a long count of milliseconds.  Some properties include KINESIS_PRODUCER_MAX_BUFFER_INTERVAL, KINESIS_PRODUCER_TLS_CONNECT_TIMEOUT, and KINESIS_PRODUCER_REQUEST_TIMEOUT, KINESIS_CONSUMER_DEFAULT_FAILOVER_TIME_MILLIS, KINESIS_CONSUMER_DEFAULT_IDLETIME_BETWEEN_READS_MILLIS, KINESIS_CONSUMER_DEFAULT_PARENT_SHARD_POLL_INTERVAL_MILLIS, KINESIS_CONSUMER_DEFAULT_SHARD_SYNC_INTERVAL_MILLIS, KINESIS_CONSUMER_DEFAULT_TASK_BACKOFF_TIME_MILLIS, KINESIS_CONSUMER_DEFAULT_METRICS_BUFFER_TIME_MILLIS.
    
    The [BinFiles](https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/BinFiles.java) class contains examples of defining both data size and time span properties.
    
    * Boolean properties should have an allowableValues list.  Some do, like KINESIS_PRODUCER_AGGREGATION_ENABLED, and some do not - KINESIS_CONSUMER_DEFAULT_DONT_CALL_PROCESS_RECORDS_FOR_EMPTY_RECORD_LIST and KINESIS_CONSUMER_DEFAULT_CLEANUP_LEASES_UPON_SHARDS_COMPLETION.
    * The producer "Max Buffer Interval" KINESIS_PRODUCER_MAX_BUFFER_INTERVAL description says that the units is seconds, and the default is 60.  From looking at the [JavaDoc on setRecordMaxBufferedTime()](https://github.com/awslabs/amazon-kinesis-producer/blob/master/java/amazon-kinesis-producer/src/main/java/com/amazonaws/services/kinesis/producer/KinesisProducerConfiguration.java#L1063), the unit appears to be milliseconds.  Do I understand that correctly?
    * I like that you provided defaults for all of the properties where it is possible.  I didn't have to fill out much to get started, which is always nice.  There remain a bewildering array of properties derived from the underlying KPL/KCL configuration, and I'm not sure they all make sense to expose.  In many cases, the processor implementation effectively includes an answer, doesn't it?  What different behavior would a user expect from changing KINESIS_PRODUCER_FAIL_IF_THROTTLED, or KINESIS_CONSUMER_DEFAULT_DONT_CALL_PROCESS_RECORDS_FOR_EMPTY_RECORD_LIST?



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #239: Nifi 1540 - AWS Kinesis Put Processor

Posted by jvwing <gi...@git.apache.org>.
Github user jvwing commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/239#discussion_r82723029
  
    --- Diff: nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/PutKinesis.java ---
    @@ -0,0 +1,174 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.aws.kinesis.stream;
    +
    +import java.io.ByteArrayOutputStream;
    +import java.nio.ByteBuffer;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Random;
    +
    +import org.apache.commons.lang3.StringUtils;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
    +import org.apache.nifi.annotation.behavior.SupportsBatching;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.DataUnit;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.util.StandardValidators;
    +
    +import com.amazonaws.services.kinesis.AmazonKinesisClient;
    +import com.amazonaws.services.kinesis.model.PutRecordsRequest;
    +import com.amazonaws.services.kinesis.model.PutRecordsRequestEntry;
    +import com.amazonaws.services.kinesis.model.PutRecordsResult;
    +import com.amazonaws.services.kinesis.model.PutRecordsResultEntry;
    +
    +@SupportsBatching
    +@InputRequirement(Requirement.INPUT_REQUIRED)
    +@Tags({"amazon", "aws", "kinesis", "put", "stream"})
    +@CapabilityDescription("Sends the contents to a specified Amazon Kinesis. "
    +    + "In order to send data to Kinesis, the stream name has to be specified.")
    +@WritesAttributes({
    +    @WritesAttribute(attribute = "aws.kinesis.error.message", description = "Error message on posting message to AWS Kinesis"),
    +    @WritesAttribute(attribute = "aws.kinesis.error.code", description = "Error code for the message when posting to AWS Kinesis"),
    +    @WritesAttribute(attribute = "aws.kinesis.sequence.number", description = "Sequence number for the message when posting to AWS Kinesis"),
    +    @WritesAttribute(attribute = "aws.kinesis.shard.id", description = "Shard id of the message posted to AWS Kinesis")})
    +public class PutKinesis extends AbstractKinesisProcessor {
    +
    +    /**
    +     * Kinesis put record response error code
    +     */
    +    public static final String AWS_KINESIS_ERROR_CODE = "aws.kinesis.error.code";
    +
    +    public static final String AWS_KINESIS_SHARD_ID = "aws.kinesis.shard.id";
    +
    +    public static final String AWS_KINESIS_SEQUENCE_NUMBER = "aws.kinesis.sequence.number";
    +
    +    public static final PropertyDescriptor KINESIS_PARTITION_KEY = new PropertyDescriptor.Builder()
    +        .displayName("Amazon Kinesis Stream Partition Key")
    +        .name("amazon-kinesis-stream-partition-key")
    +        .description("The partition key attribute.  If it is not set, a random value is used")
    +        .expressionLanguageSupported(true)
    +        .defaultValue("${kinesis.partition.key}")
    +        .required(false)
    +        .addValidator(StandardValidators.ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR).build();
    +
    +    public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
    +            Arrays.asList(KINESIS_STREAM_NAME, KINESIS_PARTITION_KEY, BATCH_SIZE, MAX_MESSAGE_BUFFER_SIZE_MB, REGION, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE,
    +                AWS_CREDENTIALS_PROVIDER_SERVICE, TIMEOUT, PROXY_HOST,PROXY_HOST_PORT));
    +
    +    /** A random number generator for cases where partition key is not available */
    +    protected Random randomParitionKeyGenerator = new Random();
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return properties;
    +    }
    +
    +    @Override
    +    public void onTrigger(final ProcessContext context, final ProcessSession session) {
    +
    +        final int batchSize = context.getProperty(BATCH_SIZE).asInteger();
    +        final long maxBufferSizeBytes = context.getProperty(MAX_MESSAGE_BUFFER_SIZE_MB).asDataSize(DataUnit.B).longValue();
    +        final String streamName = context.getProperty(KINESIS_STREAM_NAME).getValue();
    +
    +        List<FlowFile> flowFiles = filterMessagesByMaxSize(session, batchSize, maxBufferSizeBytes, streamName,
    +           AWS_KINESIS_ERROR_MESSAGE);
    +
    +        final AmazonKinesisClient client = getClient();
    +
    +        try {
    +            List<PutRecordsRequestEntry> records = new ArrayList<>();
    +
    +            List<FlowFile> failedFlowFiles = new ArrayList<>();
    +            List<FlowFile> successfulFlowFiles = new ArrayList<>();
    +
    +            // Prepare batch of records
    +            for (int i = 0; i < flowFiles.size(); i++) {
    +                FlowFile flowFile = flowFiles.get(i);
    +
    +                final ByteArrayOutputStream baos = new ByteArrayOutputStream();
    +                session.exportTo(flowFile, baos);
    +                PutRecordsRequestEntry record = new PutRecordsRequestEntry().withData(ByteBuffer.wrap(baos.toByteArray()));
    +
    +                String partitionKey = context.getProperty(PutKinesis.KINESIS_PARTITION_KEY)
    +                    .evaluateAttributeExpressions(flowFiles.get(i)).getValue();
    +
    +                if ( ! StringUtils.isBlank(partitionKey) ) {
    +                    record.setPartitionKey(partitionKey);
    +                } else {
    +                    record.setPartitionKey(Integer.toString(randomParitionKeyGenerator.nextInt()));
    +                }
    +
    +                records.add(record);
    +            }
    +
    +            if ( records.size() > 0 ) {
    +
    +                PutRecordsRequest putRecordRequest = new PutRecordsRequest();
    +                putRecordRequest.setStreamName(streamName);
    +                putRecordRequest.setRecords(records);
    +                PutRecordsResult results = client.putRecords(putRecordRequest);
    +
    +                List<PutRecordsResultEntry> responseEntries = results.getRecords();
    +                for (int i = 0; i < responseEntries.size(); i++ ) {
    +                    PutRecordsResultEntry entry = responseEntries.get(i);
    +                    FlowFile flowFile = flowFiles.get(i);
    +
    +                    Map<String,String> attributes = new HashMap<>();
    +                    attributes.put(AWS_KINESIS_SHARD_ID, entry.getShardId());
    +                    attributes.put(AWS_KINESIS_SEQUENCE_NUMBER, entry.getSequenceNumber());
    +
    +                    if ( ! StringUtils.isBlank(entry.getErrorCode()) ) {
    +                        attributes.put(AWS_KINESIS_ERROR_CODE, entry.getErrorCode());
    +                        attributes.put(AWS_KINESIS_ERROR_MESSAGE, entry.getErrorMessage());
    +                        flowFile = session.putAllAttributes(flowFile, attributes);
    +                        failedFlowFiles.add(flowFile);
    +                    } else {
    +                        flowFile = session.putAllAttributes(flowFile, attributes);
    +                        successfulFlowFiles.add(flowFile);
    +                    }
    +                }
    +                if ( failedFlowFiles.size() > 0 ) {
    +                    session.transfer(failedFlowFiles, REL_FAILURE);
    +                    getLogger().error("Failed to publish to kinesis {} records {}", new Object[]{streamName, failedFlowFiles});
    --- End diff --
    
    Are you sure you want to log the entire list of failedFlowFiles?  I found it to be a bit overwhelming, even in the expanded bulletin board view.  Maybe a count would be enough detail?  Please see the image:
    
    ![putkinesis-error-bulletin](https://cloud.githubusercontent.com/assets/3151078/19258730/b0553398-8f2f-11e6-9082-b5fb6e2dfe30.png)



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi issue #239: Nifi 1540 - AWS Kinesis Get and Put Processors

Posted by jvwing <gi...@git.apache.org>.
Github user jvwing commented on the issue:

    https://github.com/apache/nifi/pull/239
  
    @mans2singh, the error messages did not contain anything helpful that indicated it was a memory issue, just the text `com.amazonaws.services.kinesis.producer.DaemonException: The child process has been shutdown and can no longer accept messages`.  Memory was discussed on one of the KPL issue tickets you found (thanks!), so I tried it out.
    
    I'm not sure if we can do anything to handle this, but I don't know Kinesis well.  One option would be to try to restart the child process when this error happens.  But I may have actually been doing that.  I stopped and started the PutKinesisStream processor several times, which recreates the KinesisProducer instance, and the [KinesisProducer javadoc](https://github.com/awslabs/amazon-kinesis-producer/blob/master/java/amazon-kinesis-producer/src/main/java/com/amazonaws/services/kinesis/producer/KinesisProducer.java) claims this spawns a new child processor.  This might have actually worked, briefly, just that the new child process died shortly afterwards because of the same memory constraint.  So even if we automated that troubleshooting step in PutKinesisStream, the outcome for this particular issue would be the same.  Part of transparently using the KPL/KCL is that users will have to know how to troubleshoot them directly.
    
    There is no special trick to getting NiFi to work on EC2, their Linux VMs are very similar to any other.  I am used to EC2, and find it convenient for testing.  EC2's "micro" sized instances, which I use because I am terribly cheap, double as a handy way to test resource constraints.  Have you had trouble with it?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #239: Nifi 1540 - AWS Kinesis Put Processor

Posted by jvwing <gi...@git.apache.org>.
Github user jvwing commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/239#discussion_r82723417
  
    --- Diff: nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/kinesis/ITPutKinesis.java ---
    @@ -0,0 +1,475 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.aws.kinesis;
    +
    +import static org.junit.Assert.assertEquals;
    +import static org.junit.Assert.assertNotNull;
    +
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +
    +import org.apache.nifi.processors.aws.kinesis.stream.PutKinesis;
    +import org.apache.nifi.util.MockFlowFile;
    +import org.apache.nifi.util.TestRunner;
    +import org.apache.nifi.util.TestRunners;
    +import org.junit.After;
    +import org.junit.Before;
    +import org.junit.Test;
    +
    +/**
    + * This test contains both unit and integration test (integration tests are ignored by default)
    --- End diff --
    
    Could we add a comment about what kind of Kinesis Stream setup is required to run these tests?  I was able to run all of the integration tests, but it would have helped if I knew how many shards were required.  I started with just one shard, because I'm cheap, and that wasn't enough to get through the tests that exceed the 1 MB/sec provisioned throughput.  Testing with various shard sizes suggests the tests are logically OK, the only issue seems to be individual and collective provisioned throughput.  Finally, I forked over the extra $0.075 for five shards and the unit tests worked fine together.  I agree with testing the 1 MB buffer max handling, I just didn't know how to set it up right the first time.
    
    What was your setup like?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: Nifi 1540 - AWS Kinesis Get and Put Processors

Posted by mans2singh <gi...@git.apache.org>.
Github user mans2singh commented on the pull request:

    https://github.com/apache/nifi/pull/239#issuecomment-214554805
  
    @joewitt - I got busy with assignments at work and apologize for not getting back to respond to the comments.  Will it be possible to push this pull request for a later release ? I do want to wrap this request and will try to get back to it as soon as I can.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: Nifi 1540 - AWS Kinesis Get and Put Processors

Posted by mans2singh <gi...@git.apache.org>.
Github user mans2singh commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/239#discussion_r63047805
  
    --- Diff: nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/AbstractBaseAWSProcessor.java ---
    @@ -0,0 +1,211 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.aws;
    +
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Set;
    +
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.AllowableValue;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.ValidationContext;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.ProcessSessionFactory;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.processors.aws.credentials.provider.service.AWSCredentialsProviderService;
    +import org.apache.nifi.ssl.SSLContextService;
    +
    +import com.amazonaws.Protocol;
    +import com.amazonaws.regions.Region;
    +import com.amazonaws.regions.Regions;
    +
    +/**
    + * This is a base class of Nifi AWS Processors.  This class contains basic property descriptors, AWS credentials and relationships and
    + * is not dependent on AmazonWebServiceClient classes.  It's subclasses add support for interacting with AWS respective
    + * clients.
    + *
    + * @see AbstractAWSCredentialsProviderProcessor
    + * @see AbstractAWSProcessor
    + * @see AmazonWebServiceClient
    + */
    +public abstract class AbstractBaseAWSProcessor extends AbstractSessionFactoryProcessor {
    +
    +    public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success")
    +                .description("FlowFiles are routed to success relationship").build();
    +    public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure")
    +                .description("FlowFiles are routed to failure relationship").build();
    +    public static final Set<Relationship> relationships = Collections.unmodifiableSet(
    +                new HashSet<>(Arrays.asList(REL_SUCCESS, REL_FAILURE)));
    +    /**
    +     * AWS credentials provider service
    +     *
    +     * @see  <a href="http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/AWSCredentialsProvider.html">AWSCredentialsProvider</a>
    +     */
    +    public static final PropertyDescriptor AWS_CREDENTIALS_PROVIDER_SERVICE = new PropertyDescriptor.Builder()
    +            .name("AWS Credentials Provider service")
    +            .description("The Controller Service that is used to obtain aws credentials provider")
    +            .required(false)
    +            .identifiesControllerService(AWSCredentialsProviderService.class)
    +            .build();
    +
    +    public static final PropertyDescriptor CREDENTIALS_FILE = new PropertyDescriptor.Builder()
    +                .name("Credentials File")
    +                .expressionLanguageSupported(false)
    +                .required(false)
    +                .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR)
    +                .build();
    +    public static final PropertyDescriptor ACCESS_KEY = new PropertyDescriptor.Builder()
    +                .name("Access Key")
    +                .expressionLanguageSupported(true)
    +                .required(false)
    +                .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +                .sensitive(true)
    +                .build();
    +    public static final PropertyDescriptor SECRET_KEY = new PropertyDescriptor.Builder()
    +                .name("Secret Key")
    +                .expressionLanguageSupported(true)
    +                .required(false)
    +                .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +                .sensitive(true)
    +                .build();
    +    public static final PropertyDescriptor REGION = new PropertyDescriptor.Builder()
    +                .name("Region")
    +                .required(true)
    +                .allowableValues(getAvailableRegions())
    +                .defaultValue(createAllowableValue(Regions.DEFAULT_REGION).getValue())
    +                .build();
    +    public static final PropertyDescriptor TIMEOUT = new PropertyDescriptor.Builder()
    +                .name("Communications Timeout")
    +                .required(true)
    +                .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
    +                .defaultValue("30 secs")
    +                .build();
    +    public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
    +                .name("SSL Context Service")
    +                .description("Specifies an optional SSL Context Service that, if provided, will be used to create connections")
    +                .required(false)
    +                .identifiesControllerService(SSLContextService.class)
    +                .build();
    +    public static final PropertyDescriptor ENDPOINT_OVERRIDE = new PropertyDescriptor.Builder()
    +                .name("Endpoint Override URL")
    +                .description("Endpoint URL to use instead of the AWS default including scheme, host, port, and path. " +
    +                        "The AWS libraries select an endpoint URL based on the AWS region, but this property overrides " +
    +                        "the selected endpoint URL, allowing use with other S3-compatible endpoints.")
    +                .required(false)
    +                .addValidator(StandardValidators.URL_VALIDATOR)
    +                .build();
    +    public static final PropertyDescriptor PROXY_HOST = new PropertyDescriptor.Builder()
    +            .name("Proxy Host")
    +            .description("Proxy host name or IP")
    +            .expressionLanguageSupported(true)
    +            .required(false)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor PROXY_HOST_PORT = new PropertyDescriptor.Builder()
    +            .name("Proxy Host Port")
    +            .description("Proxy host port")
    +            .expressionLanguageSupported(true)
    +            .required(false)
    +            .addValidator(StandardValidators.PORT_VALIDATOR)
    +            .build();
    +
    +    protected volatile Region region;
    +    protected static final Protocol DEFAULT_PROTOCOL = Protocol.HTTPS;
    +    protected static final String DEFAULT_USER_AGENT = "NiFi";
    +
    +    private static AllowableValue createAllowableValue(final Regions regions) {
    +        return new AllowableValue(regions.getName(), regions.getName(), regions.getName());
    +    }
    +
    +    private static AllowableValue[] getAvailableRegions() {
    +        final List<AllowableValue> values = new ArrayList<>();
    +        for (final Regions regions : Regions.values()) {
    +            values.add(createAllowableValue(regions));
    +        }
    +
    +        return (AllowableValue[]) values.toArray(new AllowableValue[values.size()]);
    +    }
    +
    +    public AbstractBaseAWSProcessor() {
    +        super();
    +    }
    +
    +    @OnScheduled
    +    public void onScheduled(final ProcessContext context) {
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return relationships;
    +    }
    +
    +    @Override
    +    public void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) throws ProcessException {
    +        final ProcessSession session = sessionFactory.createSession();
    +        try {
    +            onTrigger(context, session);
    +            session.commit();
    +        } catch (final Throwable t) {
    +            getLogger().error("{} failed to process due to {}; rolling back session", new Object[]{this, t});
    +            session.rollback(true);
    +            throw t;
    +        }
    +    }
    --- End diff --
    
    I need access to the ProcessSessionFactory to handle asynchronous event processing by KCL.  Please see my notes below and let me know if there is a better way to handle this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi issue #239: Nifi 1540 - AWS Kinesis Get and Put Processors

Posted by jvwing <gi...@git.apache.org>.
Github user jvwing commented on the issue:

    https://github.com/apache/nifi/pull/239
  
    @mans2singh - that sounds great to me!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #239: Nifi 1540 - AWS Kinesis Put Processor

Posted by jvwing <gi...@git.apache.org>.
Github user jvwing commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/239#discussion_r82723082
  
    --- Diff: nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/PutKinesis.java ---
    @@ -0,0 +1,174 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.aws.kinesis.stream;
    +
    +import java.io.ByteArrayOutputStream;
    +import java.nio.ByteBuffer;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Random;
    +
    +import org.apache.commons.lang3.StringUtils;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
    +import org.apache.nifi.annotation.behavior.SupportsBatching;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.DataUnit;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.util.StandardValidators;
    +
    +import com.amazonaws.services.kinesis.AmazonKinesisClient;
    +import com.amazonaws.services.kinesis.model.PutRecordsRequest;
    +import com.amazonaws.services.kinesis.model.PutRecordsRequestEntry;
    +import com.amazonaws.services.kinesis.model.PutRecordsResult;
    +import com.amazonaws.services.kinesis.model.PutRecordsResultEntry;
    +
    +@SupportsBatching
    +@InputRequirement(Requirement.INPUT_REQUIRED)
    +@Tags({"amazon", "aws", "kinesis", "put", "stream"})
    +@CapabilityDescription("Sends the contents to a specified Amazon Kinesis. "
    +    + "In order to send data to Kinesis, the stream name has to be specified.")
    +@WritesAttributes({
    +    @WritesAttribute(attribute = "aws.kinesis.error.message", description = "Error message on posting message to AWS Kinesis"),
    +    @WritesAttribute(attribute = "aws.kinesis.error.code", description = "Error code for the message when posting to AWS Kinesis"),
    +    @WritesAttribute(attribute = "aws.kinesis.sequence.number", description = "Sequence number for the message when posting to AWS Kinesis"),
    +    @WritesAttribute(attribute = "aws.kinesis.shard.id", description = "Shard id of the message posted to AWS Kinesis")})
    +public class PutKinesis extends AbstractKinesisProcessor {
    +
    +    /**
    +     * Kinesis put record response error code
    +     */
    +    public static final String AWS_KINESIS_ERROR_CODE = "aws.kinesis.error.code";
    +
    +    public static final String AWS_KINESIS_SHARD_ID = "aws.kinesis.shard.id";
    +
    +    public static final String AWS_KINESIS_SEQUENCE_NUMBER = "aws.kinesis.sequence.number";
    +
    +    public static final PropertyDescriptor KINESIS_PARTITION_KEY = new PropertyDescriptor.Builder()
    +        .displayName("Amazon Kinesis Stream Partition Key")
    +        .name("amazon-kinesis-stream-partition-key")
    +        .description("The partition key attribute.  If it is not set, a random value is used")
    +        .expressionLanguageSupported(true)
    +        .defaultValue("${kinesis.partition.key}")
    +        .required(false)
    +        .addValidator(StandardValidators.ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR).build();
    +
    +    public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
    +            Arrays.asList(KINESIS_STREAM_NAME, KINESIS_PARTITION_KEY, BATCH_SIZE, MAX_MESSAGE_BUFFER_SIZE_MB, REGION, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE,
    +                AWS_CREDENTIALS_PROVIDER_SERVICE, TIMEOUT, PROXY_HOST,PROXY_HOST_PORT));
    +
    +    /** A random number generator for cases where partition key is not available */
    +    protected Random randomParitionKeyGenerator = new Random();
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return properties;
    +    }
    +
    +    @Override
    +    public void onTrigger(final ProcessContext context, final ProcessSession session) {
    +
    +        final int batchSize = context.getProperty(BATCH_SIZE).asInteger();
    +        final long maxBufferSizeBytes = context.getProperty(MAX_MESSAGE_BUFFER_SIZE_MB).asDataSize(DataUnit.B).longValue();
    +        final String streamName = context.getProperty(KINESIS_STREAM_NAME).getValue();
    +
    +        List<FlowFile> flowFiles = filterMessagesByMaxSize(session, batchSize, maxBufferSizeBytes, streamName,
    +           AWS_KINESIS_ERROR_MESSAGE);
    +
    +        final AmazonKinesisClient client = getClient();
    +
    +        try {
    +            List<PutRecordsRequestEntry> records = new ArrayList<>();
    +
    +            List<FlowFile> failedFlowFiles = new ArrayList<>();
    +            List<FlowFile> successfulFlowFiles = new ArrayList<>();
    +
    +            // Prepare batch of records
    +            for (int i = 0; i < flowFiles.size(); i++) {
    +                FlowFile flowFile = flowFiles.get(i);
    +
    +                final ByteArrayOutputStream baos = new ByteArrayOutputStream();
    +                session.exportTo(flowFile, baos);
    +                PutRecordsRequestEntry record = new PutRecordsRequestEntry().withData(ByteBuffer.wrap(baos.toByteArray()));
    +
    +                String partitionKey = context.getProperty(PutKinesis.KINESIS_PARTITION_KEY)
    +                    .evaluateAttributeExpressions(flowFiles.get(i)).getValue();
    +
    +                if ( ! StringUtils.isBlank(partitionKey) ) {
    +                    record.setPartitionKey(partitionKey);
    +                } else {
    +                    record.setPartitionKey(Integer.toString(randomParitionKeyGenerator.nextInt()));
    +                }
    +
    +                records.add(record);
    +            }
    +
    +            if ( records.size() > 0 ) {
    +
    +                PutRecordsRequest putRecordRequest = new PutRecordsRequest();
    +                putRecordRequest.setStreamName(streamName);
    +                putRecordRequest.setRecords(records);
    +                PutRecordsResult results = client.putRecords(putRecordRequest);
    +
    +                List<PutRecordsResultEntry> responseEntries = results.getRecords();
    +                for (int i = 0; i < responseEntries.size(); i++ ) {
    +                    PutRecordsResultEntry entry = responseEntries.get(i);
    +                    FlowFile flowFile = flowFiles.get(i);
    +
    +                    Map<String,String> attributes = new HashMap<>();
    +                    attributes.put(AWS_KINESIS_SHARD_ID, entry.getShardId());
    +                    attributes.put(AWS_KINESIS_SEQUENCE_NUMBER, entry.getSequenceNumber());
    +
    +                    if ( ! StringUtils.isBlank(entry.getErrorCode()) ) {
    +                        attributes.put(AWS_KINESIS_ERROR_CODE, entry.getErrorCode());
    +                        attributes.put(AWS_KINESIS_ERROR_MESSAGE, entry.getErrorMessage());
    +                        flowFile = session.putAllAttributes(flowFile, attributes);
    +                        failedFlowFiles.add(flowFile);
    +                    } else {
    +                        flowFile = session.putAllAttributes(flowFile, attributes);
    +                        successfulFlowFiles.add(flowFile);
    +                    }
    +                }
    +                if ( failedFlowFiles.size() > 0 ) {
    +                    session.transfer(failedFlowFiles, REL_FAILURE);
    +                    getLogger().error("Failed to publish to kinesis {} records {}", new Object[]{streamName, failedFlowFiles});
    +                }
    +                if ( successfulFlowFiles.size() > 0 ) {
    +                    session.transfer(successfulFlowFiles, REL_SUCCESS);
    +                    getLogger().info("Successfully published to kinesis {} records {}", new Object[]{streamName, successfulFlowFiles});
    --- End diff --
    
    Same concern about logging an entire list of flowfiles.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: Nifi 1540 - AWS Kinesis Get and Put Processors

Posted by mans2singh <gi...@git.apache.org>.
Github user mans2singh commented on the pull request:

    https://github.com/apache/nifi/pull/239#issuecomment-218325113
  
    Hi Nifi Folks, @apiri, @olegz, @joewitt : 
    
    Thanks for your review comments.  I apologize I had to take care of work related assignments and could not get back to addressing your review comments, earlier.  However, I would like to discuss this pull request with and get your advice/recommendations on it.
    
    Here is a summary some of the changes I made for integrating the Kinesis Consumer/Producer into the aws nifi processors. You probably are already familiar with these but just so that we are on the same page and you can correct/advice me. Currently aws nifi processors extend the class:
    
    `
    public abstract class AbstractAWSCredentialsProviderProcessor<ClientType extends AmazonWebServiceClient> extends AbstractAWSProcessor<ClientType>
    `
    
    All the current aws nifi processors use AmazonWebServiceClient to connect the aws services.  There is a similar client for Kinesis: `AmazonKinesisClient`.  However, AWS also a new pair of libraries `KinesisProducer` from the Kinesis Producer Library (KPL) and `KinesisRecordProcessor` based `Worker` from Kinesis Consumer Library (KCL) classes.  These new libraries handle batching, packing  records, parallelism, resharding, workload balancing, monitoring, asynchronous processing, retries, etc.  For using KCL/KPL we do not need `AmazonWebServiceClient` based client classes.  I chose to implement the Nifi AWS Kinesis Get and Put processors using KCL/KPL libraries rather than the older AmazonKinesisClient.
    
    I could have implemented Nifi Kinesis processors  separately, but I thought it would be better if we have a common base class hierarchy for all Nifi AWS processors.  In order to make things consist for all the Nifi AWS processors (those that use `AmazonWebServiceClient` and those that don't eg: `KinesisProducer`), I created a base class 'AbstractBaseAWSProcessor' which is a non-generic, simple class.  This class has two subclasses:
    
    - `AbstractAWSProcessor` which is the superclass of all Nifi AWS Processors using generic `AmazonWebServiceClient` (eg: S3, etc)
    -  `AbstractKinesisProcessor` which does not use `AmazonWebServiceClient` and instead use the KPL/KCL classes.
    
    I used `AbstractSessionFactoryProcessor` as the base class for `AbstractBaseAWSProcessor` rather than `AbstractProcessor`.  The reason for this was that KCL `Worker` instance invokes `IRecordProcessorFactory` asynchronously to create a `IRecordProcessor`, which it uses to handles records as they arrive by invoking the following method: 
    
    `void processRecords(ProcessRecordsInput processRecordsInput)` 
    
    In order to create the `ProcessSession` on demand (for GetKinesis) I found we need to have access to the `SessionFactory`.  The `AbstractBaseAWSProcessor` therefore extends `AbstractSessionFactoryProcessor` and it has two methods to handle both cases where we need to access session for current Nifi AWS processors and PutKinesis processor, and GetKinesis which can create the session on demand on arrival of records.  Here is the relevant code:
    
    ` public void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) throws ProcessException {
            final ProcessSession session = sessionFactory.createSession();
            try {
                onTrigger(context, session);
                session.commit();
            } catch (final Throwable t) {
                getLogger().error("{} failed to process due to {}; rolling back session", new Object[]{this, t});
                session.rollback(true);
                throw t;
            }
        }
     public abstract void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException;`
    
    Finally, the `Worker` used in the `GetKinesis` is a runnable and I used ExecutorService to launch and manage it's lifecycle.
    
    I've migrated the base classes of all the current Nifi AWS processors to work the new class hierarchy.
    
    If you have any thoughts/recommendations/etc on how I can implement the Kinesis processors using KCL/KPL please let me know.
    
    To address the review comments from @olegz :
    1. Regarding the empty onScheduled method and constructor - I can remove them.
    2. Regarding using sub-classing from `AbstractProcessor`, I used the `AbstractSessionFactoryProcessor` to get access to SessionFactory on demand.
    3. Regarding consumerWorker.shutdown() throwing exception - The shutdown method on the worker only sets a shutdown flag and does not throw any exception.  But if you have any advice, let me know.
    
    Thanks again for your advice/support and I hope to get your feedback incorporated as soon as I can.
    
    Mans



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #239: Nifi 1540 - AWS Kinesis Put Processor

Posted by jvwing <gi...@git.apache.org>.
Github user jvwing commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/239#discussion_r82723115
  
    --- Diff: nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/PutKinesis.java ---
    @@ -0,0 +1,174 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.aws.kinesis.stream;
    +
    +import java.io.ByteArrayOutputStream;
    +import java.nio.ByteBuffer;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Random;
    +
    +import org.apache.commons.lang3.StringUtils;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
    +import org.apache.nifi.annotation.behavior.SupportsBatching;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.DataUnit;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.util.StandardValidators;
    +
    +import com.amazonaws.services.kinesis.AmazonKinesisClient;
    +import com.amazonaws.services.kinesis.model.PutRecordsRequest;
    +import com.amazonaws.services.kinesis.model.PutRecordsRequestEntry;
    +import com.amazonaws.services.kinesis.model.PutRecordsResult;
    +import com.amazonaws.services.kinesis.model.PutRecordsResultEntry;
    +
    +@SupportsBatching
    +@InputRequirement(Requirement.INPUT_REQUIRED)
    +@Tags({"amazon", "aws", "kinesis", "put", "stream"})
    +@CapabilityDescription("Sends the contents to a specified Amazon Kinesis. "
    +    + "In order to send data to Kinesis, the stream name has to be specified.")
    +@WritesAttributes({
    +    @WritesAttribute(attribute = "aws.kinesis.error.message", description = "Error message on posting message to AWS Kinesis"),
    +    @WritesAttribute(attribute = "aws.kinesis.error.code", description = "Error code for the message when posting to AWS Kinesis"),
    +    @WritesAttribute(attribute = "aws.kinesis.sequence.number", description = "Sequence number for the message when posting to AWS Kinesis"),
    +    @WritesAttribute(attribute = "aws.kinesis.shard.id", description = "Shard id of the message posted to AWS Kinesis")})
    +public class PutKinesis extends AbstractKinesisProcessor {
    +
    +    /**
    +     * Kinesis put record response error code
    +     */
    +    public static final String AWS_KINESIS_ERROR_CODE = "aws.kinesis.error.code";
    +
    +    public static final String AWS_KINESIS_SHARD_ID = "aws.kinesis.shard.id";
    +
    +    public static final String AWS_KINESIS_SEQUENCE_NUMBER = "aws.kinesis.sequence.number";
    +
    +    public static final PropertyDescriptor KINESIS_PARTITION_KEY = new PropertyDescriptor.Builder()
    +        .displayName("Amazon Kinesis Stream Partition Key")
    +        .name("amazon-kinesis-stream-partition-key")
    +        .description("The partition key attribute.  If it is not set, a random value is used")
    +        .expressionLanguageSupported(true)
    +        .defaultValue("${kinesis.partition.key}")
    +        .required(false)
    +        .addValidator(StandardValidators.ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR).build();
    +
    +    public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
    +            Arrays.asList(KINESIS_STREAM_NAME, KINESIS_PARTITION_KEY, BATCH_SIZE, MAX_MESSAGE_BUFFER_SIZE_MB, REGION, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE,
    +                AWS_CREDENTIALS_PROVIDER_SERVICE, TIMEOUT, PROXY_HOST,PROXY_HOST_PORT));
    +
    +    /** A random number generator for cases where partition key is not available */
    +    protected Random randomParitionKeyGenerator = new Random();
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return properties;
    +    }
    +
    +    @Override
    +    public void onTrigger(final ProcessContext context, final ProcessSession session) {
    +
    +        final int batchSize = context.getProperty(BATCH_SIZE).asInteger();
    +        final long maxBufferSizeBytes = context.getProperty(MAX_MESSAGE_BUFFER_SIZE_MB).asDataSize(DataUnit.B).longValue();
    +        final String streamName = context.getProperty(KINESIS_STREAM_NAME).getValue();
    +
    +        List<FlowFile> flowFiles = filterMessagesByMaxSize(session, batchSize, maxBufferSizeBytes, streamName,
    +           AWS_KINESIS_ERROR_MESSAGE);
    +
    +        final AmazonKinesisClient client = getClient();
    +
    +        try {
    +            List<PutRecordsRequestEntry> records = new ArrayList<>();
    +
    +            List<FlowFile> failedFlowFiles = new ArrayList<>();
    +            List<FlowFile> successfulFlowFiles = new ArrayList<>();
    +
    +            // Prepare batch of records
    +            for (int i = 0; i < flowFiles.size(); i++) {
    +                FlowFile flowFile = flowFiles.get(i);
    +
    +                final ByteArrayOutputStream baos = new ByteArrayOutputStream();
    +                session.exportTo(flowFile, baos);
    +                PutRecordsRequestEntry record = new PutRecordsRequestEntry().withData(ByteBuffer.wrap(baos.toByteArray()));
    +
    +                String partitionKey = context.getProperty(PutKinesis.KINESIS_PARTITION_KEY)
    +                    .evaluateAttributeExpressions(flowFiles.get(i)).getValue();
    +
    +                if ( ! StringUtils.isBlank(partitionKey) ) {
    +                    record.setPartitionKey(partitionKey);
    +                } else {
    +                    record.setPartitionKey(Integer.toString(randomParitionKeyGenerator.nextInt()));
    +                }
    +
    +                records.add(record);
    +            }
    +
    +            if ( records.size() > 0 ) {
    +
    +                PutRecordsRequest putRecordRequest = new PutRecordsRequest();
    +                putRecordRequest.setStreamName(streamName);
    +                putRecordRequest.setRecords(records);
    +                PutRecordsResult results = client.putRecords(putRecordRequest);
    +
    +                List<PutRecordsResultEntry> responseEntries = results.getRecords();
    +                for (int i = 0; i < responseEntries.size(); i++ ) {
    +                    PutRecordsResultEntry entry = responseEntries.get(i);
    +                    FlowFile flowFile = flowFiles.get(i);
    +
    +                    Map<String,String> attributes = new HashMap<>();
    +                    attributes.put(AWS_KINESIS_SHARD_ID, entry.getShardId());
    +                    attributes.put(AWS_KINESIS_SEQUENCE_NUMBER, entry.getSequenceNumber());
    +
    +                    if ( ! StringUtils.isBlank(entry.getErrorCode()) ) {
    +                        attributes.put(AWS_KINESIS_ERROR_CODE, entry.getErrorCode());
    +                        attributes.put(AWS_KINESIS_ERROR_MESSAGE, entry.getErrorMessage());
    +                        flowFile = session.putAllAttributes(flowFile, attributes);
    +                        failedFlowFiles.add(flowFile);
    +                    } else {
    +                        flowFile = session.putAllAttributes(flowFile, attributes);
    +                        successfulFlowFiles.add(flowFile);
    +                    }
    +                }
    +                if ( failedFlowFiles.size() > 0 ) {
    +                    session.transfer(failedFlowFiles, REL_FAILURE);
    +                    getLogger().error("Failed to publish to kinesis {} records {}", new Object[]{streamName, failedFlowFiles});
    +                }
    +                if ( successfulFlowFiles.size() > 0 ) {
    +                    session.transfer(successfulFlowFiles, REL_SUCCESS);
    +                    getLogger().info("Successfully published to kinesis {} records {}", new Object[]{streamName, successfulFlowFiles});
    +                }
    +                records.clear();
    +            }
    +
    +        } catch (final Exception exception) {
    +            getLogger().error("Failed to publish to kinesis {} with exception {}", new Object[]{flowFiles, exception});
    --- End diff --
    
    Same concern about logging an entire list of flowfiles.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi issue #239: Nifi 1540 - AWS Kinesis Put Processor

Posted by mans2singh <gi...@git.apache.org>.
Github user mans2singh commented on the issue:

    https://github.com/apache/nifi/pull/239
  
    @jvwing - Made the following updates:
    
    1. I've updated logging to use exception first
    2. Changed names to use suffix Stream.
    
    Let me know if there is anything else outstanding.
    
    Mans


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi issue #239: Nifi 1540 - AWS Kinesis Get and Put Processors

Posted by mans2singh <gi...@git.apache.org>.
Github user mans2singh commented on the issue:

    https://github.com/apache/nifi/pull/239
  
    @jvwing - I've created kinesis put processor using AWS standard SDK.  Please let me know your thoughts.
    
    Thanks


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: Nifi 1540 - AWS Kinesis Get and Put Processors

Posted by mans2singh <gi...@git.apache.org>.
Github user mans2singh commented on the pull request:

    https://github.com/apache/nifi/pull/239#issuecomment-221567920
  
    Hey @jvwing @olegz @apiri - Just wondering if you've had the time to review this request.  If there are  any feedback comments/recommendations, please let me know.  Thanks. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi issue #239: Nifi 1540 - AWS Kinesis Get and Put Processors

Posted by jvwing <gi...@git.apache.org>.
Github user jvwing commented on the issue:

    https://github.com/apache/nifi/pull/239
  
    The suggested use of `name` and `displayName` on PropertyDescriptors has been shared around a lot the last few days.  You can read the [backstory thread](http://mail-archives.apache.org/mod_mbox/nifi-dev/201605.mbox/%3C5A6FDF1E-1889-46FE-A3C4-5D2F0A905979@apache.org%3E) on the best practice and the reasons for it.  The short, short version is to provide `name` as a computer-readable key to saved settings in templates and flows, and `displayName` as a human-readable description which may be changed or translated without breaking compatibility with saved data.  A good example is this PropertyDescriptor from PutS3Object:
    
        public static final PropertyDescriptor SERVER_SIDE_ENCRYPTION = new PropertyDescriptor.Builder()
                .name("server-side-encryption")
                .displayName("Server Side Encryption")
                .description("Specifies the algorithm used for server side encryption.")
                .required(true)
                .allowableValues(NO_SERVER_SIDE_ENCRYPTION, ObjectMetadata.AES_256_SERVER_SIDE_ENCRYPTION)
                .defaultValue(NO_SERVER_SIDE_ENCRYPTION)
                .build();



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi issue #239: Nifi 1540 - AWS Kinesis Put Processor

Posted by miquillo <gi...@git.apache.org>.
Github user miquillo commented on the issue:

    https://github.com/apache/nifi/pull/239
  
    @jvwing I kind of overlooked the validator for 1-500 for the BATCH_SIZE property, didn't realize it actually captures the constraint of the package size. Go with the approach currently implemented.
    
    @mans2singh @jvwing great work! Hope to see this is the next release :)
    



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---