You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@nifi.apache.org by mans2singh <gi...@git.apache.org> on 2016/02/10 06:28:58 UTC

[GitHub] nifi pull request: Nifi 1489 (Support for http proxy) + Nifi 1495 ...

GitHub user mans2singh opened a pull request:

    https://github.com/apache/nifi/pull/213

    Nifi 1489 (Support for http proxy) + Nifi 1495 (AWS Kinesis Firehose)

    This pull request combines Http proxy enhancement for aws processors (nifi-1489) and aws kinesis firehose processor (nifi-1495)

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/mans2singh/nifi nifi-1495

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/nifi/pull/213.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #213
    
----
commit 83c05145ffe9caab932698cb970cde27d905cf81
Author: mans2singh <ma...@yahoo.com>
Date:   2016-02-08T19:43:03Z

    Added support for using proxy

commit 2b8772cdee082528ce121fcc9034c78f5960e233
Author: mans2singh <ma...@yahoo.com>
Date:   2016-02-09T19:49:05Z

    Added support for Kinesis Firehose

commit 7bf42bbf6038fc0147934ab4ed45cc1729e8e13f
Author: mans2singh <ma...@yahoo.com>
Date:   2016-02-10T03:55:38Z

    minor comments

commit fd10d0b76a66ce9fc7868f87c84c1ebc2c25f995
Author: mans2singh <ma...@yahoo.com>
Date:   2016-02-08T19:43:03Z

    Added support for using proxy

commit ca28ad9f05bcb1e3a191096bd8ca10de01628bbd
Author: mans2singh <ma...@yahoo.com>
Date:   2016-02-09T19:49:05Z

    Added support for Kinesis Firehose

commit 875c961c3181aed367361ea8bceba0fe80f5f274
Author: mans2singh <ma...@yahoo.com>
Date:   2016-02-10T03:55:38Z

    minor comments

commit 28e8cfda694a190b58d65a0657a04e92fff5c1ec
Author: mans2singh <ma...@yahoo.com>
Date:   2016-02-10T04:01:47Z

    Merge branch 'awsfirehose' of https://github.com/mans2singh/nifi into awsfirehose

commit 1ccfab0916e372c738aeac8cb2ab6556b4d8191f
Author: mans2singh <ma...@yahoo.com>
Date:   2016-02-10T05:16:26Z

    explicit check for proxy host and port

commit 4febccf6d3c2157cf0d68009beff5b9923e81d17
Author: mans2singh <ma...@yahoo.com>
Date:   2016-02-10T05:22:19Z

    added comments to kinesis test

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: Nifi 1495 - AWS Kinesis Firehose

Posted by mans2singh <gi...@git.apache.org>.
Github user mans2singh commented on the pull request:

    https://github.com/apache/nifi/pull/213#issuecomment-196646278
  
    @aldrin - I've added max buffer size limit check while getting flow files from the session as you had recommended.  I've also kept the batch size limit in place. I've added integration tests to check that the messages are sent once the max buffer size is exceeded. Can you please review the code and let me know if it meets your design ?
    Thanks


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: Nifi 1495 - AWS Kinesis Firehose

Posted by mans2singh <gi...@git.apache.org>.
Github user mans2singh commented on the pull request:

    https://github.com/apache/nifi/pull/213#issuecomment-197183816
  
    @aldrin @mattyb149 Thanks for your pointers and samples for using data validator.  I've made the changes and updated test cases.  Please let me know if there is anything else required.  Thanks again.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: Nifi 1489 (Support for http proxy) + Nifi 1495 ...

Posted by apiri <gi...@git.apache.org>.
Github user apiri commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/213#discussion_r53845184
  
    --- Diff: nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/firehose/PutKinesisFirehose.java ---
    @@ -0,0 +1,184 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.aws.kinesis.firehose;
    +
    +import java.io.ByteArrayOutputStream;
    +import java.nio.ByteBuffer;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +
    +import org.apache.commons.lang3.StringUtils;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
    +import org.apache.nifi.annotation.behavior.SupportsBatching;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.ValidationContext;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import com.amazonaws.services.kinesisfirehose.AmazonKinesisFirehoseClient;
    +import com.amazonaws.services.kinesisfirehose.model.PutRecordBatchRequest;
    +import com.amazonaws.services.kinesisfirehose.model.PutRecordBatchResponseEntry;
    +import com.amazonaws.services.kinesisfirehose.model.PutRecordBatchResult;
    +import com.amazonaws.services.kinesisfirehose.model.Record;
    +
    +@SupportsBatching
    +@InputRequirement(Requirement.INPUT_REQUIRED)
    +@Tags({"amazon", "aws", "firehose", "kinesis", "put", "stream"})
    +@CapabilityDescription("Sends the contents to a specified Amazon Kinesis Firehose. "
    +    + "In order to send data to firehose, the firehose delivery stream name has to be specified.")
    +@WritesAttributes({
    +    @WritesAttribute(attribute = "aws.kinesis.firehose.error.message", description = "Error message on posting message to AWS Kinesis Firehose"),
    +    @WritesAttribute(attribute = "aws.kinesis.firehose.error.code", description = "Error code for the message when posting to AWS Kinesis Firehose"),
    +    @WritesAttribute(attribute = "aws.kinesis.firehose.record.id", description = "Record id of the message posted to Kinesis Firehose")})
    +public class PutKinesisFirehose extends AbstractKinesisFirehoseProcessor {
    +
    +    /**
    +     * Kinesis put record response error message
    +     */
    +    public static final String AWS_KINESIS_FIREHOSE_ERROR_MESSAGE = "aws.kinesis.firehose.error.message";
    +
    +    /**
    +     * Kinesis put record response error code
    +     */
    +    public static final String AWS_KINESIS_FIREHOSE_ERROR_CODE = "aws.kinesis.firehose.error.code";
    +
    +    /**
    +     * Kinesis put record response record id
    +     */
    +    public static final String AWS_KINESIS_FIREHOSE_RECORD_ID = "aws.kinesis.firehose.record.id";
    +
    +    public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
    +            Arrays.asList(KINESIS_FIREHOSE_DELIVERY_STREAM_NAME, MAX_BUFFER_INTERVAL,
    +                  MAX_BUFFER_SIZE, BATCH_SIZE, REGION, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, TIMEOUT,
    +                  PROXY_HOST,PROXY_HOST_PORT));
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return properties;
    +    }
    +
    +    @Override
    +    protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
    --- End diff --
    
    With the alternative validators specified above, it should be possible to completely remove the need for this method.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: Nifi 1495 - AWS Kinesis Firehose

Posted by apiri <gi...@git.apache.org>.
Github user apiri commented on the pull request:

    https://github.com/apache/nifi/pull/213#issuecomment-197294536
  
    @mans2singh Looks great, thanks for getting those changes together and the contribution.  I'll be merging this into master shortly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: Nifi 1489 (Support for http proxy) + Nifi 1495 ...

Posted by apiri <gi...@git.apache.org>.
Github user apiri commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/213#discussion_r53844010
  
    --- Diff: nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/firehose/AbstractKinesisFirehoseProcessor.java ---
    @@ -0,0 +1,89 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.aws.kinesis.firehose;
    +
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.processors.aws.AbstractAWSCredentialsProviderProcessor;
    +
    +import com.amazonaws.ClientConfiguration;
    +import com.amazonaws.auth.AWSCredentials;
    +import com.amazonaws.auth.AWSCredentialsProvider;
    +import com.amazonaws.services.kinesisfirehose.AmazonKinesisFirehoseClient;
    +
    +/**
    + * This class provides processor the base class for kinesis firehose
    + */
    +public abstract class AbstractKinesisFirehoseProcessor extends AbstractAWSCredentialsProviderProcessor<AmazonKinesisFirehoseClient> {
    +
    +    public static final PropertyDescriptor KINESIS_FIREHOSE_DELIVERY_STREAM_NAME = new PropertyDescriptor.Builder()
    +            .name("Amazon Kinesis Firehose Delivery Stream Name")
    +            .description("The name of kinesis firehose delivery stream")
    +            .expressionLanguageSupported(false)
    +            .required(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor MAX_BUFFER_INTERVAL = new PropertyDescriptor.Builder()
    +            .name("Max Buffer Interval")
    +            .description("Buffering interval for messages (between 60 and 900 seconds).")
    +            .defaultValue("60")
    +            .required(false)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    --- End diff --
    
    I don't see this property being passed through to the associated KinesisFirehoseClient.  Am I overlooking something?
    
    If so, you should prefer the usage of StandardValidators#createLongValidator to create a bounded range.  This would let you then not have to do your customValidate logic.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: Nifi 1495 - AWS Kinesis Firehose

Posted by apiri <gi...@git.apache.org>.
Github user apiri commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/213#discussion_r56212702
  
    --- Diff: nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/firehose/PutKinesisFirehose.java ---
    @@ -0,0 +1,180 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.aws.kinesis.firehose;
    +
    +import java.io.ByteArrayOutputStream;
    +import java.nio.ByteBuffer;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +
    +import org.apache.commons.lang3.StringUtils;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
    +import org.apache.nifi.annotation.behavior.SupportsBatching;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import com.amazonaws.services.kinesisfirehose.AmazonKinesisFirehoseClient;
    +import com.amazonaws.services.kinesisfirehose.model.PutRecordBatchRequest;
    +import com.amazonaws.services.kinesisfirehose.model.PutRecordBatchResponseEntry;
    +import com.amazonaws.services.kinesisfirehose.model.PutRecordBatchResult;
    +import com.amazonaws.services.kinesisfirehose.model.Record;
    +
    +@SupportsBatching
    +@InputRequirement(Requirement.INPUT_REQUIRED)
    +@Tags({"amazon", "aws", "firehose", "kinesis", "put", "stream"})
    +@CapabilityDescription("Sends the contents to a specified Amazon Kinesis Firehose. "
    +    + "In order to send data to firehose, the firehose delivery stream name has to be specified.")
    +@WritesAttributes({
    +    @WritesAttribute(attribute = "aws.kinesis.firehose.error.message", description = "Error message on posting message to AWS Kinesis Firehose"),
    +    @WritesAttribute(attribute = "aws.kinesis.firehose.error.code", description = "Error code for the message when posting to AWS Kinesis Firehose"),
    +    @WritesAttribute(attribute = "aws.kinesis.firehose.record.id", description = "Record id of the message posted to Kinesis Firehose")})
    +public class PutKinesisFirehose extends AbstractKinesisFirehoseProcessor {
    +
    +    /**
    +     * Kinesis put record response error message
    +     */
    +    public static final String AWS_KINESIS_FIREHOSE_ERROR_MESSAGE = "aws.kinesis.firehose.error.message";
    +
    +    /**
    +     * Kinesis put record response error code
    +     */
    +    public static final String AWS_KINESIS_FIREHOSE_ERROR_CODE = "aws.kinesis.firehose.error.code";
    +
    +    /**
    +     * Kinesis put record response record id
    +     */
    +    public static final String AWS_KINESIS_FIREHOSE_RECORD_ID = "aws.kinesis.firehose.record.id";
    +
    +    public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
    +            Arrays.asList(KINESIS_FIREHOSE_DELIVERY_STREAM_NAME, BATCH_SIZE, MAX_MESSAGE_BUFFER_SIZE_MB, REGION, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, TIMEOUT,
    +                  PROXY_HOST,PROXY_HOST_PORT));
    +
    +    /**
    +     * Max buffer size 1000kb
    +     */
    +    public static final int MAX_MESSAGE_SIZE = 1000 * 1024;
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return properties;
    +    }
    +
    +    @Override
    +    public void onTrigger(final ProcessContext context, final ProcessSession session) {
    +        FlowFile flowFileCandidate = session.get();
    +        if ( flowFileCandidate == null )
    +            return;
    +
    +        long currentBufferSizeBytes = flowFileCandidate.getSize();
    +
    +        final int batchSize = context.getProperty(BATCH_SIZE).asInteger();
    +        final int maxBufferSizeBytes = context.getProperty(MAX_MESSAGE_BUFFER_SIZE_MB).asInteger() * 1024 * 1000;
    --- End diff --
    
    In conjunction with the Validator change above, we could also make use of .asDataSize for the property instead of integer.  Will help remove the need for your own conversion.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: Nifi 1495 - AWS Kinesis Firehose

Posted by mattyb149 <gi...@git.apache.org>.
Github user mattyb149 commented on the pull request:

    https://github.com/apache/nifi/pull/213#issuecomment-197125258
  
    @mans2singh I think Aldrin was referring to DATA_SIZE_VALIDATOR in org.apache.nifi.processor.util.StandardValidators. If you need custom min/max size (in bytes), you can use StandardValidators.createDataSizeBoundsValidator(minBytesInclusive, maxBytesInclusive) 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: Nifi 1489 (Support for http proxy) + Nifi 1495 ...

Posted by apiri <gi...@git.apache.org>.
Github user apiri commented on the pull request:

    https://github.com/apache/nifi/pull/213#issuecomment-187907196
  
    @mans2singh Did an initial look over the code.  The approach seems good but would like to understand more about the batching process and how we can perform this in a safe manner that does not exhaust heap so readily.  
    
    Also, would you please be able to edit the PR name to something with NIFI-1495 in it so that hopefully the JIRA integration will link and include those items on the issue.  It is unneeded to keep the reference to NIFI-1489 as those changes have been merged and PR closed in #209 
    
    Thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: Nifi 1489 (Support for http proxy) + Nifi 1495 ...

Posted by apiri <gi...@git.apache.org>.
Github user apiri commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/213#discussion_r53846413
  
    --- Diff: nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/kinesis/firehose/ITPutKinesisFirehose.java ---
    @@ -0,0 +1,149 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.aws.kinesis.firehose;
    +
    +import java.util.List;
    +
    +import org.apache.nifi.processors.aws.s3.FetchS3Object;
    +import org.apache.nifi.util.MockFlowFile;
    +import org.apache.nifi.util.TestRunner;
    +import org.apache.nifi.util.TestRunners;
    +import org.junit.After;
    +import org.junit.Before;
    +import org.junit.Ignore;
    +import org.junit.Test;
    +
    +/**
    + * This test contains both unit and integration test (integration tests are ignored by default)
    + */
    +public class ITPutKinesisFirehose {
    +
    +    private TestRunner runner;
    +    protected final static String CREDENTIALS_FILE = System.getProperty("user.home") + "/aws-credentials.properties";
    +
    +    @Before
    +    public void setUp() throws Exception {
    +        runner = TestRunners.newTestRunner(PutKinesisFirehose.class);
    +        runner.setProperty(PutKinesisFirehose.ACCESS_KEY, "abcd");
    +        runner.setProperty(PutKinesisFirehose.SECRET_KEY, "secret key");
    +        runner.setProperty(PutKinesisFirehose.KINESIS_FIREHOSE_DELIVERY_STREAM_NAME, "deliveryName");
    +        runner.assertValid();
    +    }
    +
    +    @After
    +    public void tearDown() throws Exception {
    +        runner = null;
    +    }
    +
    +    @Test
    +    public void testCustomValidateBatchSize1Valid() {
    --- End diff --
    
    These customValidate tests with the above improvements likely add little and could be removed with adjusted validators.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: Nifi 1495 - AWS Kinesis Firehose

Posted by apiri <gi...@git.apache.org>.
Github user apiri commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/213#discussion_r56212999
  
    --- Diff: nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/firehose/PutKinesisFirehose.java ---
    @@ -0,0 +1,180 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.aws.kinesis.firehose;
    +
    +import java.io.ByteArrayOutputStream;
    +import java.nio.ByteBuffer;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +
    +import org.apache.commons.lang3.StringUtils;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
    +import org.apache.nifi.annotation.behavior.SupportsBatching;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import com.amazonaws.services.kinesisfirehose.AmazonKinesisFirehoseClient;
    +import com.amazonaws.services.kinesisfirehose.model.PutRecordBatchRequest;
    +import com.amazonaws.services.kinesisfirehose.model.PutRecordBatchResponseEntry;
    +import com.amazonaws.services.kinesisfirehose.model.PutRecordBatchResult;
    +import com.amazonaws.services.kinesisfirehose.model.Record;
    +
    +@SupportsBatching
    +@InputRequirement(Requirement.INPUT_REQUIRED)
    +@Tags({"amazon", "aws", "firehose", "kinesis", "put", "stream"})
    +@CapabilityDescription("Sends the contents to a specified Amazon Kinesis Firehose. "
    +    + "In order to send data to firehose, the firehose delivery stream name has to be specified.")
    +@WritesAttributes({
    +    @WritesAttribute(attribute = "aws.kinesis.firehose.error.message", description = "Error message on posting message to AWS Kinesis Firehose"),
    +    @WritesAttribute(attribute = "aws.kinesis.firehose.error.code", description = "Error code for the message when posting to AWS Kinesis Firehose"),
    +    @WritesAttribute(attribute = "aws.kinesis.firehose.record.id", description = "Record id of the message posted to Kinesis Firehose")})
    +public class PutKinesisFirehose extends AbstractKinesisFirehoseProcessor {
    +
    +    /**
    +     * Kinesis put record response error message
    +     */
    +    public static final String AWS_KINESIS_FIREHOSE_ERROR_MESSAGE = "aws.kinesis.firehose.error.message";
    +
    +    /**
    +     * Kinesis put record response error code
    +     */
    +    public static final String AWS_KINESIS_FIREHOSE_ERROR_CODE = "aws.kinesis.firehose.error.code";
    +
    +    /**
    +     * Kinesis put record response record id
    +     */
    +    public static final String AWS_KINESIS_FIREHOSE_RECORD_ID = "aws.kinesis.firehose.record.id";
    +
    +    public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
    +            Arrays.asList(KINESIS_FIREHOSE_DELIVERY_STREAM_NAME, BATCH_SIZE, MAX_MESSAGE_BUFFER_SIZE_MB, REGION, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, TIMEOUT,
    +                  PROXY_HOST,PROXY_HOST_PORT));
    +
    +    /**
    +     * Max buffer size 1000kb
    +     */
    +    public static final int MAX_MESSAGE_SIZE = 1000 * 1024;
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return properties;
    +    }
    +
    +    @Override
    +    public void onTrigger(final ProcessContext context, final ProcessSession session) {
    +        FlowFile flowFileCandidate = session.get();
    +        if ( flowFileCandidate == null )
    +            return;
    +
    +        long currentBufferSizeBytes = flowFileCandidate.getSize();
    +
    +        final int batchSize = context.getProperty(BATCH_SIZE).asInteger();
    +        final int maxBufferSizeBytes = context.getProperty(MAX_MESSAGE_BUFFER_SIZE_MB).asInteger() * 1024 * 1000;
    +
    +        // Get max batch size messages with size limit defined by maxBufferSizeBytes
    +        List<FlowFile> flowFiles = new ArrayList<FlowFile>(batchSize);
    +        flowFiles.add(flowFileCandidate);
    +        for (int i = 1; (i < batchSize) && (currentBufferSizeBytes <= maxBufferSizeBytes); i++) {
    +            flowFileCandidate = session.get();
    +            if ( flowFileCandidate == null )
    +                break;
    +            currentBufferSizeBytes += flowFileCandidate.getSize();
    +            flowFiles.add(flowFileCandidate);
    +        }
    +
    +        final String firehoseStreamName = context.getProperty(KINESIS_FIREHOSE_DELIVERY_STREAM_NAME).getValue();
    +        final AmazonKinesisFirehoseClient client = getClient();
    +
    +        try {
    +            List<Record> records = new ArrayList<>();
    +
    +            List<FlowFile> failedFlowFiles = new ArrayList<>();
    +            List<FlowFile> successfulFlowFiles = new ArrayList<>();
    +
    +            // Prepare batch of records
    +            for (int i = 0; i < flowFiles.size(); i++) {
    +                FlowFile flowFile = flowFiles.get(i);
    +
    +                // If flow file too large send it to failure
    --- End diff --
    
    On second thought, I am okay with this.  Seeing as how the Firehose also batches/aggregates I do not feel there is any issue with sending batch sizes that may be a bit smaller


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: Nifi 1489 (Support for http proxy) + Nifi 1495 ...

Posted by apiri <gi...@git.apache.org>.
Github user apiri commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/213#discussion_r53844969
  
    --- Diff: nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/firehose/AbstractKinesisFirehoseProcessor.java ---
    @@ -0,0 +1,89 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.aws.kinesis.firehose;
    +
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.processors.aws.AbstractAWSCredentialsProviderProcessor;
    +
    +import com.amazonaws.ClientConfiguration;
    +import com.amazonaws.auth.AWSCredentials;
    +import com.amazonaws.auth.AWSCredentialsProvider;
    +import com.amazonaws.services.kinesisfirehose.AmazonKinesisFirehoseClient;
    +
    +/**
    + * This class provides processor the base class for kinesis firehose
    + */
    +public abstract class AbstractKinesisFirehoseProcessor extends AbstractAWSCredentialsProviderProcessor<AmazonKinesisFirehoseClient> {
    +
    +    public static final PropertyDescriptor KINESIS_FIREHOSE_DELIVERY_STREAM_NAME = new PropertyDescriptor.Builder()
    +            .name("Amazon Kinesis Firehose Delivery Stream Name")
    +            .description("The name of kinesis firehose delivery stream")
    +            .expressionLanguageSupported(false)
    +            .required(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor MAX_BUFFER_INTERVAL = new PropertyDescriptor.Builder()
    +            .name("Max Buffer Interval")
    +            .description("Buffering interval for messages (between 60 and 900 seconds).")
    +            .defaultValue("60")
    +            .required(false)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .sensitive(false)
    +            .build();
    +
    +    public static final PropertyDescriptor MAX_BUFFER_SIZE = new PropertyDescriptor.Builder()
    --- End diff --
    
    Same commentary as above for MAX_BUFFER_INTERVAL.  Not sure how this is being used.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: Nifi 1489 (Support for http proxy) + Nifi 1495 ...

Posted by mans2singh <gi...@git.apache.org>.
Github user mans2singh commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/213#discussion_r53893291
  
    --- Diff: nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/kinesis/firehose/ITPutKinesisFirehose.java ---
    @@ -0,0 +1,149 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.aws.kinesis.firehose;
    +
    +import java.util.List;
    +
    +import org.apache.nifi.processors.aws.s3.FetchS3Object;
    +import org.apache.nifi.util.MockFlowFile;
    +import org.apache.nifi.util.TestRunner;
    +import org.apache.nifi.util.TestRunners;
    +import org.junit.After;
    +import org.junit.Before;
    +import org.junit.Ignore;
    +import org.junit.Test;
    +
    +/**
    + * This test contains both unit and integration test (integration tests are ignored by default)
    + */
    +public class ITPutKinesisFirehose {
    --- End diff --
    
    Moved unit test to separate class.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: Nifi 1495 - AWS Kinesis Firehose

Posted by apiri <gi...@git.apache.org>.
Github user apiri commented on the pull request:

    https://github.com/apache/nifi/pull/213#issuecomment-195173926
  
    @mans2singh Yep.  I think we can probably replace the number of items in a batch size (or make this secondary to the buffer size).  The idea would be to continuously grab FlowFiles until we've reached this threshold and then when the next item arrives that would put us over that limit, we transfer it back to the incoming queue session.transfer(flowFile) **note the lack of relationship**.  We can then create the batch much the same way you have done now.  Does that make sense?
    
    Thanks for your work on this (and all the other great AWS stuff, very popular extensions), and apologies for the lag on following up again on this issue.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: Nifi 1489 (Support for http proxy) + Nifi 1495 ...

Posted by apiri <gi...@git.apache.org>.
Github user apiri commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/213#discussion_r53850051
  
    --- Diff: nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/kinesis/firehose/ITPutKinesisFirehose.java ---
    @@ -0,0 +1,149 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.aws.kinesis.firehose;
    +
    +import java.util.List;
    +
    +import org.apache.nifi.processors.aws.s3.FetchS3Object;
    +import org.apache.nifi.util.MockFlowFile;
    +import org.apache.nifi.util.TestRunner;
    +import org.apache.nifi.util.TestRunners;
    +import org.junit.After;
    +import org.junit.Before;
    +import org.junit.Ignore;
    +import org.junit.Test;
    +
    +/**
    + * This test contains both unit and integration test (integration tests are ignored by default)
    + */
    +public class ITPutKinesisFirehose {
    +
    +    private TestRunner runner;
    +    protected final static String CREDENTIALS_FILE = System.getProperty("user.home") + "/aws-credentials.properties";
    +
    +    @Before
    +    public void setUp() throws Exception {
    +        runner = TestRunners.newTestRunner(PutKinesisFirehose.class);
    +        runner.setProperty(PutKinesisFirehose.ACCESS_KEY, "abcd");
    +        runner.setProperty(PutKinesisFirehose.SECRET_KEY, "secret key");
    +        runner.setProperty(PutKinesisFirehose.KINESIS_FIREHOSE_DELIVERY_STREAM_NAME, "deliveryName");
    +        runner.assertValid();
    +    }
    +
    +    @After
    +    public void tearDown() throws Exception {
    +        runner = null;
    +    }
    +
    +    @Test
    +    public void testCustomValidateBatchSize1Valid() {
    +        runner.setProperty(PutKinesisFirehose.BATCH_SIZE, "1");
    +        runner.assertValid();
    +    }
    +
    +    @Test
    +    public void testCustomValidateBatchSize500Valid() {
    +        runner.setProperty(PutKinesisFirehose.BATCH_SIZE, "500");
    +        runner.assertValid();
    +    }
    +    @Test
    +    public void testCustomValidateBatchSize501InValid() {
    +        runner.setProperty(PutKinesisFirehose.BATCH_SIZE, "501");
    +        runner.assertNotValid();
    +    }
    +
    +    @Test
    +    public void testCustomValidateBufferSize1Valid() {
    +        runner.setProperty(PutKinesisFirehose.MAX_BUFFER_SIZE, "1");
    +        runner.assertValid();
    +    }
    +
    +    @Test
    +    public void testCustomValidateBufferSize128Valid() {
    +        runner.setProperty(PutKinesisFirehose.MAX_BUFFER_SIZE, "128");
    +        runner.assertValid();
    +    }
    +    @Test
    +    public void testCustomValidateBufferSize129InValid() {
    +        runner.setProperty(PutKinesisFirehose.MAX_BUFFER_SIZE, "129");
    +        runner.assertNotValid();
    +    }
    +
    +    @Test
    +    public void testCustomValidateBufferInterval900Valid() {
    +        runner.setProperty(PutKinesisFirehose.MAX_BUFFER_INTERVAL, "900");
    +        runner.assertValid();
    +    }
    +
    +    @Test
    +    public void testCustomValidateBufferInterval60Valid() {
    +        runner.setProperty(PutKinesisFirehose.MAX_BUFFER_INTERVAL, "60");
    +        runner.assertValid();
    +    }
    +
    +    @Test
    +    public void testCustomValidateBufferInterval901InValid() {
    +        runner.setProperty(PutKinesisFirehose.MAX_BUFFER_INTERVAL, "901");
    +        runner.assertNotValid();
    +    }
    +
    +    @Test
    +    public void testCustomValidateBufferInterval59InValid() {
    +        runner.setProperty(PutKinesisFirehose.MAX_BUFFER_INTERVAL, "59");
    +        runner.assertNotValid();
    +    }
    +
    +    /**
    +     * Comment out ignore for integration tests (requires creds files)
    +     */
    +    @Test
    +    @Ignore
    +    public void testIntegrationSuccess() throws Exception {
    +        runner = TestRunners.newTestRunner(PutKinesisFirehose.class);
    +        runner.setProperty(PutKinesisFirehose.CREDENTIALS_FILE, CREDENTIALS_FILE);
    +        runner.setProperty(PutKinesisFirehose.KINESIS_FIREHOSE_DELIVERY_STREAM_NAME, "firehose-s3-test");
    +        runner.assertValid();
    +
    +        runner.enqueue("test".getBytes());
    +        runner.run(1);
    +
    +        runner.assertAllFlowFilesTransferred(PutKinesisFirehose.REL_SUCCESS, 1);
    +
    +        final List<MockFlowFile> ffs = runner.getFlowFilesForRelationship(FetchS3Object.REL_SUCCESS);
    +        final MockFlowFile out = ffs.iterator().next();
    +
    +        out.assertContentEquals("test".getBytes());
    +    }
    +
    +    /**
    +     * Comment out ignore for integration tests (requires creds files)
    +     */
    +    @Test
    +    @Ignore
    --- End diff --
    
    Same as above.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: Nifi 1495 - AWS Kinesis Firehose

Posted by apiri <gi...@git.apache.org>.
Github user apiri commented on the pull request:

    https://github.com/apache/nifi/pull/213#issuecomment-194975737
  
    @mans2singh I had a chance to sit down and revisit this.  Overall, it looks good and I was able to test a flow successfully putting to a Kinesis Firehose which aggregated and dumped to S3.
    
    One thing that was mentioned prior in my initial review that we still need to cover is that of how we are handling batching.  I do think we need to handle that in a more constrained fashion given that file sizes could vary widely.  With how the processor is currently configured, it could hold up to 250MB in memory, by default.  Instead, what would your thoughts be on converting this to a buffer size property.  If people want batching, they can specify a given memory size (perhaps something like 1 MB by default) and then we can wait until that threshold is hit or no more input flowfiles are available, at which point they are sent off in a batch.  If batching is not desired, they can either empty the buffer property or specify 0 bytes.
    
    Thoughts on this approach?  Ultimately, we are trying to avoid people from incidentally causing issues with heap exhaustion.  With the prescribed approach here, people can get as aggressive as they wish with batching and have a finitely constrained amount of space per instance.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: Nifi 1489 (Support for http proxy) + Nifi 1495 ...

Posted by apiri <gi...@git.apache.org>.
Github user apiri commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/213#discussion_r53849365
  
    --- Diff: nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/firehose/PutKinesisFirehose.java ---
    @@ -0,0 +1,184 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.aws.kinesis.firehose;
    +
    +import java.io.ByteArrayOutputStream;
    +import java.nio.ByteBuffer;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +
    +import org.apache.commons.lang3.StringUtils;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
    +import org.apache.nifi.annotation.behavior.SupportsBatching;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.ValidationContext;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import com.amazonaws.services.kinesisfirehose.AmazonKinesisFirehoseClient;
    +import com.amazonaws.services.kinesisfirehose.model.PutRecordBatchRequest;
    +import com.amazonaws.services.kinesisfirehose.model.PutRecordBatchResponseEntry;
    +import com.amazonaws.services.kinesisfirehose.model.PutRecordBatchResult;
    +import com.amazonaws.services.kinesisfirehose.model.Record;
    +
    +@SupportsBatching
    +@InputRequirement(Requirement.INPUT_REQUIRED)
    +@Tags({"amazon", "aws", "firehose", "kinesis", "put", "stream"})
    +@CapabilityDescription("Sends the contents to a specified Amazon Kinesis Firehose. "
    +    + "In order to send data to firehose, the firehose delivery stream name has to be specified.")
    +@WritesAttributes({
    +    @WritesAttribute(attribute = "aws.kinesis.firehose.error.message", description = "Error message on posting message to AWS Kinesis Firehose"),
    +    @WritesAttribute(attribute = "aws.kinesis.firehose.error.code", description = "Error code for the message when posting to AWS Kinesis Firehose"),
    +    @WritesAttribute(attribute = "aws.kinesis.firehose.record.id", description = "Record id of the message posted to Kinesis Firehose")})
    +public class PutKinesisFirehose extends AbstractKinesisFirehoseProcessor {
    +
    +    /**
    +     * Kinesis put record response error message
    +     */
    +    public static final String AWS_KINESIS_FIREHOSE_ERROR_MESSAGE = "aws.kinesis.firehose.error.message";
    +
    +    /**
    +     * Kinesis put record response error code
    +     */
    +    public static final String AWS_KINESIS_FIREHOSE_ERROR_CODE = "aws.kinesis.firehose.error.code";
    +
    +    /**
    +     * Kinesis put record response record id
    +     */
    +    public static final String AWS_KINESIS_FIREHOSE_RECORD_ID = "aws.kinesis.firehose.record.id";
    +
    +    public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
    +            Arrays.asList(KINESIS_FIREHOSE_DELIVERY_STREAM_NAME, MAX_BUFFER_INTERVAL,
    +                  MAX_BUFFER_SIZE, BATCH_SIZE, REGION, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, TIMEOUT,
    +                  PROXY_HOST,PROXY_HOST_PORT));
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return properties;
    +    }
    +
    +    @Override
    +    protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
    +        final List<ValidationResult> problems = new ArrayList<>(super.customValidate(validationContext));
    +        final boolean batchSizeSet = validationContext.getProperty(BATCH_SIZE).isSet();
    +
    +        if ( batchSizeSet) {
    +           int batchSize = validationContext.getProperty(BATCH_SIZE).asInteger();
    +           if ( batchSize < 1 || batchSize > 500 ) {
    +                problems.add(new ValidationResult.Builder().input("Batch Size").valid(false).explanation("Batch size must be between 1 and 500 but was " + batchSize).build());
    +           }
    +        }
    +
    +        final boolean maxBufferIntervalIsSet = validationContext.getProperty(MAX_BUFFER_INTERVAL).isSet();
    +        if ( maxBufferIntervalIsSet) {
    +           int maxBufferInterval = validationContext.getProperty(MAX_BUFFER_INTERVAL).asInteger();
    +           if ( maxBufferInterval < 60 || maxBufferInterval > 900 ) {
    +                problems.add(new ValidationResult.Builder().input("Max Buffer Interval").valid(false)
    +                   .explanation("Max Buffer Interval must be between 60 and 900 seconds but was " + maxBufferInterval).build());
    +           }
    +        }
    +
    +        final boolean maxBufferSizeIsSet = validationContext.getProperty(MAX_BUFFER_SIZE).isSet();
    +        if ( maxBufferSizeIsSet) {
    +           int maxBufferSize = validationContext.getProperty(MAX_BUFFER_SIZE).asInteger();
    +           if ( maxBufferSize < 1 || maxBufferSize > 128 ) {
    +                problems.add(new ValidationResult.Builder().input("Max Buffer Size").valid(false).explanation("Max Buffer Size must be between 1 and 128 (mb) but was " + maxBufferSize).build());
    +           }
    +        }
    +        return problems;
    +    }
    +
    +    @Override
    +    public void onTrigger(final ProcessContext context, final ProcessSession session) {
    +        final int batchSize = context.getProperty(BATCH_SIZE).asInteger();
    +
    +        List<FlowFile> flowFiles = session.get(batchSize);
    +        if (flowFiles == null || flowFiles.size() == 0) {
    +            return;
    +        }
    +
    +        final String firehoseStreamName = context.getProperty(KINESIS_FIREHOSE_DELIVERY_STREAM_NAME).getValue();
    +        final AmazonKinesisFirehoseClient client = getClient();
    +
    +        try {
    +            List<Record> records = new ArrayList<>();
    +
    +            // Prepare batch of records
    +            for (int i = 0; i < flowFiles.size(); i++) {
    +                final ByteArrayOutputStream baos = new ByteArrayOutputStream();
    --- End diff --
    
    This section is a bit concerning as we are taking the contents of the FlowFiles and putting these items into memory which is going to impact heap.  According to the Kinesis docs (http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesisfirehose/AmazonKinesisFirehoseClient.html) it appears the maximum value this can support is 1,000 kB raw (before encoding occurs).  Given the default batch size, even if constrained, would be ~250MB/instance of exertion on the heap.  
    
    A couple of points:
    
    - Not overly familiar with the Kinesis API but is there a way to kind of stream these items directly from the session so we don't have to load them into memory.
    - It would likely be worthwhile to also inspect each flowfile's size to ensure it does not violate the 1000kB supported.  Doing some quick stepping through, it does not appear this is even detected client side.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: Nifi 1489 (Support for http proxy) + Nifi 1495 ...

Posted by mans2singh <gi...@git.apache.org>.
Github user mans2singh commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/213#discussion_r53893236
  
    --- Diff: nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/firehose/PutKinesisFirehose.java ---
    @@ -0,0 +1,184 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.aws.kinesis.firehose;
    +
    +import java.io.ByteArrayOutputStream;
    +import java.nio.ByteBuffer;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +
    +import org.apache.commons.lang3.StringUtils;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
    +import org.apache.nifi.annotation.behavior.SupportsBatching;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.ValidationContext;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import com.amazonaws.services.kinesisfirehose.AmazonKinesisFirehoseClient;
    +import com.amazonaws.services.kinesisfirehose.model.PutRecordBatchRequest;
    +import com.amazonaws.services.kinesisfirehose.model.PutRecordBatchResponseEntry;
    +import com.amazonaws.services.kinesisfirehose.model.PutRecordBatchResult;
    +import com.amazonaws.services.kinesisfirehose.model.Record;
    +
    +@SupportsBatching
    +@InputRequirement(Requirement.INPUT_REQUIRED)
    +@Tags({"amazon", "aws", "firehose", "kinesis", "put", "stream"})
    +@CapabilityDescription("Sends the contents to a specified Amazon Kinesis Firehose. "
    +    + "In order to send data to firehose, the firehose delivery stream name has to be specified.")
    +@WritesAttributes({
    +    @WritesAttribute(attribute = "aws.kinesis.firehose.error.message", description = "Error message on posting message to AWS Kinesis Firehose"),
    +    @WritesAttribute(attribute = "aws.kinesis.firehose.error.code", description = "Error code for the message when posting to AWS Kinesis Firehose"),
    +    @WritesAttribute(attribute = "aws.kinesis.firehose.record.id", description = "Record id of the message posted to Kinesis Firehose")})
    +public class PutKinesisFirehose extends AbstractKinesisFirehoseProcessor {
    +
    +    /**
    +     * Kinesis put record response error message
    +     */
    +    public static final String AWS_KINESIS_FIREHOSE_ERROR_MESSAGE = "aws.kinesis.firehose.error.message";
    +
    +    /**
    +     * Kinesis put record response error code
    +     */
    +    public static final String AWS_KINESIS_FIREHOSE_ERROR_CODE = "aws.kinesis.firehose.error.code";
    +
    +    /**
    +     * Kinesis put record response record id
    +     */
    +    public static final String AWS_KINESIS_FIREHOSE_RECORD_ID = "aws.kinesis.firehose.record.id";
    +
    +    public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
    +            Arrays.asList(KINESIS_FIREHOSE_DELIVERY_STREAM_NAME, MAX_BUFFER_INTERVAL,
    +                  MAX_BUFFER_SIZE, BATCH_SIZE, REGION, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, TIMEOUT,
    +                  PROXY_HOST,PROXY_HOST_PORT));
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return properties;
    +    }
    +
    +    @Override
    +    protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
    --- End diff --
    
    I've removed the custom validator code


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: Nifi 1495 - AWS Kinesis Firehose

Posted by mans2singh <gi...@git.apache.org>.
Github user mans2singh commented on the pull request:

    https://github.com/apache/nifi/pull/213#issuecomment-195409880
  
    @apiri  - To make things simple we get rid of the batch size and we just get one flow file at a time on each invocation of the onTrigger.  We store the flow files in memory until the max size limit or no more are coming in and then send the batch held in memory to kinesis firehose.  Do we have to do anything to "close out" the collected batch ?  


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: Nifi 1495 - AWS Kinesis Firehose

Posted by apiri <gi...@git.apache.org>.
Github user apiri commented on the pull request:

    https://github.com/apache/nifi/pull/213#issuecomment-196476035
  
    Hey @mans2singh,
    
    Just wanted to see how the progress was going.  We are coming up on release time for 0.6.0 and need to start wrapping things up.  If this is something we can bring to a close in the next day or, great, if not, can we push this off for 0.7.0?
    
    Thanks and let me know! 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: Nifi 1495 - AWS Kinesis Firehose

Posted by mans2singh <gi...@git.apache.org>.
Github user mans2singh commented on the pull request:

    https://github.com/apache/nifi/pull/213#issuecomment-197122905
  
    @aldrin - I've moved the check for buffer size and rebased as you had recommended.  I could not find the data size validator that you have mentioned though. Please let me know if you have any other recommendation.  Thanks


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: Nifi 1489 (Support for http proxy) + Nifi 1495 ...

Posted by apiri <gi...@git.apache.org>.
Github user apiri commented on the pull request:

    https://github.com/apache/nifi/pull/213#issuecomment-184862199
  
    @mans2singh Would you be able to rebase and squash these commits now that the proxy support has been included?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: Nifi 1489 (Support for http proxy) + Nifi 1495 ...

Posted by mans2singh <gi...@git.apache.org>.
Github user mans2singh commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/213#discussion_r53893209
  
    --- Diff: nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/firehose/AbstractKinesisFirehoseProcessor.java ---
    @@ -0,0 +1,89 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.aws.kinesis.firehose;
    +
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.processors.aws.AbstractAWSCredentialsProviderProcessor;
    +
    +import com.amazonaws.ClientConfiguration;
    +import com.amazonaws.auth.AWSCredentials;
    +import com.amazonaws.auth.AWSCredentialsProvider;
    +import com.amazonaws.services.kinesisfirehose.AmazonKinesisFirehoseClient;
    +
    +/**
    + * This class provides processor the base class for kinesis firehose
    + */
    +public abstract class AbstractKinesisFirehoseProcessor extends AbstractAWSCredentialsProviderProcessor<AmazonKinesisFirehoseClient> {
    +
    +    public static final PropertyDescriptor KINESIS_FIREHOSE_DELIVERY_STREAM_NAME = new PropertyDescriptor.Builder()
    +            .name("Amazon Kinesis Firehose Delivery Stream Name")
    +            .description("The name of kinesis firehose delivery stream")
    +            .expressionLanguageSupported(false)
    +            .required(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor MAX_BUFFER_INTERVAL = new PropertyDescriptor.Builder()
    +            .name("Max Buffer Interval")
    +            .description("Buffering interval for messages (between 60 and 900 seconds).")
    +            .defaultValue("60")
    +            .required(false)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .sensitive(false)
    +            .build();
    +
    +    public static final PropertyDescriptor MAX_BUFFER_SIZE = new PropertyDescriptor.Builder()
    +            .name("Max Buffer Size (MB)")
    +            .description("Buffering size for messages (between 1MB and 128MB).")
    +            .defaultValue("128")
    +            .required(false)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .sensitive(false)
    +            .build();
    +
    +    public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
    --- End diff --
    
    @apiri - I've corrected this based on your feedback.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: Nifi 1495 - AWS Kinesis Firehose

Posted by apiri <gi...@git.apache.org>.
Github user apiri commented on the pull request:

    https://github.com/apache/nifi/pull/213#issuecomment-195367460
  
    @mans2singh I think your logic for collecting the flowfiles should probably just continuously grab individual files at a time.  This continues until one of a couple scenarios is reached:  max buffer size is met or exceeded (based on the the size of each flowfile that has been collected), there are no more files coming into the processor (one or more flowfiles have been collected but we have not yet eclipsed the max buffer size).  In either case, we would close out that collected batch of files and send them on their way much as you had before.
    
    In terms of the 250MB by default, this is from the default batch size of 250MB and, the worst case scenario, each file is 1MB in size. While each of these files is converted to a byte array for sending, they are continuously sitting on the heap.  In the event multiple instances of this processor are running, we could quickly consume some big chunks of the heap.  What I am proposing is to either get rid of the batch size (that property no longer exists) or make this a secondary consideration where we try to receive a certain batch size but first ensure we do not exceed the configured buffer and second, do not exceed the batch size, with similar semantics to the above scenarios in terms of when those batches are sent on their way plus, when a certain batch size has been reached.
    
    Does that clarify a bit? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: Nifi 1495 - AWS Kinesis Firehose

Posted by apiri <gi...@git.apache.org>.
Github user apiri commented on the pull request:

    https://github.com/apache/nifi/pull/213#issuecomment-197151921
  
    @mattyb149 has it.
    
    @mans2singh the updates look great.  Concerning the validator, how does 6bed3bcaad61c516eacf268400cdd03c0cd58dae look to you?
    
    Otherwise, I think we can send this one on its way.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: Nifi 1489 (Support for http proxy) + Nifi 1495 ...

Posted by apiri <gi...@git.apache.org>.
Github user apiri commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/213#discussion_r53845533
  
    --- Diff: nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/firehose/PutKinesisFirehose.java ---
    @@ -0,0 +1,184 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.aws.kinesis.firehose;
    +
    +import java.io.ByteArrayOutputStream;
    +import java.nio.ByteBuffer;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +
    +import org.apache.commons.lang3.StringUtils;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
    +import org.apache.nifi.annotation.behavior.SupportsBatching;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.ValidationContext;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import com.amazonaws.services.kinesisfirehose.AmazonKinesisFirehoseClient;
    +import com.amazonaws.services.kinesisfirehose.model.PutRecordBatchRequest;
    +import com.amazonaws.services.kinesisfirehose.model.PutRecordBatchResponseEntry;
    +import com.amazonaws.services.kinesisfirehose.model.PutRecordBatchResult;
    +import com.amazonaws.services.kinesisfirehose.model.Record;
    +
    +@SupportsBatching
    +@InputRequirement(Requirement.INPUT_REQUIRED)
    +@Tags({"amazon", "aws", "firehose", "kinesis", "put", "stream"})
    +@CapabilityDescription("Sends the contents to a specified Amazon Kinesis Firehose. "
    +    + "In order to send data to firehose, the firehose delivery stream name has to be specified.")
    +@WritesAttributes({
    +    @WritesAttribute(attribute = "aws.kinesis.firehose.error.message", description = "Error message on posting message to AWS Kinesis Firehose"),
    +    @WritesAttribute(attribute = "aws.kinesis.firehose.error.code", description = "Error code for the message when posting to AWS Kinesis Firehose"),
    +    @WritesAttribute(attribute = "aws.kinesis.firehose.record.id", description = "Record id of the message posted to Kinesis Firehose")})
    +public class PutKinesisFirehose extends AbstractKinesisFirehoseProcessor {
    +
    +    /**
    +     * Kinesis put record response error message
    +     */
    +    public static final String AWS_KINESIS_FIREHOSE_ERROR_MESSAGE = "aws.kinesis.firehose.error.message";
    +
    +    /**
    +     * Kinesis put record response error code
    +     */
    +    public static final String AWS_KINESIS_FIREHOSE_ERROR_CODE = "aws.kinesis.firehose.error.code";
    +
    +    /**
    +     * Kinesis put record response record id
    +     */
    +    public static final String AWS_KINESIS_FIREHOSE_RECORD_ID = "aws.kinesis.firehose.record.id";
    +
    +    public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
    +            Arrays.asList(KINESIS_FIREHOSE_DELIVERY_STREAM_NAME, MAX_BUFFER_INTERVAL,
    +                  MAX_BUFFER_SIZE, BATCH_SIZE, REGION, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, TIMEOUT,
    +                  PROXY_HOST,PROXY_HOST_PORT));
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return properties;
    +    }
    +
    +    @Override
    +    protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
    +        final List<ValidationResult> problems = new ArrayList<>(super.customValidate(validationContext));
    +        final boolean batchSizeSet = validationContext.getProperty(BATCH_SIZE).isSet();
    +
    +        if ( batchSizeSet) {
    +           int batchSize = validationContext.getProperty(BATCH_SIZE).asInteger();
    +           if ( batchSize < 1 || batchSize > 500 ) {
    +                problems.add(new ValidationResult.Builder().input("Batch Size").valid(false).explanation("Batch size must be between 1 and 500 but was " + batchSize).build());
    +           }
    +        }
    +
    +        final boolean maxBufferIntervalIsSet = validationContext.getProperty(MAX_BUFFER_INTERVAL).isSet();
    +        if ( maxBufferIntervalIsSet) {
    +           int maxBufferInterval = validationContext.getProperty(MAX_BUFFER_INTERVAL).asInteger();
    +           if ( maxBufferInterval < 60 || maxBufferInterval > 900 ) {
    +                problems.add(new ValidationResult.Builder().input("Max Buffer Interval").valid(false)
    +                   .explanation("Max Buffer Interval must be between 60 and 900 seconds but was " + maxBufferInterval).build());
    +           }
    +        }
    +
    +        final boolean maxBufferSizeIsSet = validationContext.getProperty(MAX_BUFFER_SIZE).isSet();
    +        if ( maxBufferSizeIsSet) {
    +           int maxBufferSize = validationContext.getProperty(MAX_BUFFER_SIZE).asInteger();
    +           if ( maxBufferSize < 1 || maxBufferSize > 128 ) {
    +                problems.add(new ValidationResult.Builder().input("Max Buffer Size").valid(false).explanation("Max Buffer Size must be between 1 and 128 (mb) but was " + maxBufferSize).build());
    +           }
    +        }
    +        return problems;
    +    }
    +
    +    @Override
    +    public void onTrigger(final ProcessContext context, final ProcessSession session) {
    +        final int batchSize = context.getProperty(BATCH_SIZE).asInteger();
    +
    +        List<FlowFile> flowFiles = session.get(batchSize);
    +        if (flowFiles == null || flowFiles.size() == 0) {
    --- End diff --
    
    Minor:  this will never return null, only an empty collection in the case of there being no input


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: Nifi 1489 (Support for http proxy) + Nifi 1495 ...

Posted by apiri <gi...@git.apache.org>.
Github user apiri commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/213#discussion_r53845121
  
    --- Diff: nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/firehose/AbstractKinesisFirehoseProcessor.java ---
    @@ -0,0 +1,89 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.aws.kinesis.firehose;
    +
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.processors.aws.AbstractAWSCredentialsProviderProcessor;
    +
    +import com.amazonaws.ClientConfiguration;
    +import com.amazonaws.auth.AWSCredentials;
    +import com.amazonaws.auth.AWSCredentialsProvider;
    +import com.amazonaws.services.kinesisfirehose.AmazonKinesisFirehoseClient;
    +
    +/**
    + * This class provides processor the base class for kinesis firehose
    + */
    +public abstract class AbstractKinesisFirehoseProcessor extends AbstractAWSCredentialsProviderProcessor<AmazonKinesisFirehoseClient> {
    +
    +    public static final PropertyDescriptor KINESIS_FIREHOSE_DELIVERY_STREAM_NAME = new PropertyDescriptor.Builder()
    +            .name("Amazon Kinesis Firehose Delivery Stream Name")
    +            .description("The name of kinesis firehose delivery stream")
    +            .expressionLanguageSupported(false)
    +            .required(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor MAX_BUFFER_INTERVAL = new PropertyDescriptor.Builder()
    +            .name("Max Buffer Interval")
    +            .description("Buffering interval for messages (between 60 and 900 seconds).")
    +            .defaultValue("60")
    +            .required(false)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .sensitive(false)
    +            .build();
    +
    +    public static final PropertyDescriptor MAX_BUFFER_SIZE = new PropertyDescriptor.Builder()
    +            .name("Max Buffer Size (MB)")
    +            .description("Buffering size for messages (between 1MB and 128MB).")
    +            .defaultValue("128")
    +            .required(false)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .sensitive(false)
    +            .build();
    +
    +    public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
    --- End diff --
    
    I see this is used, but again, consider the aforementioned validator to get a range and help remove the unneeded customValidate


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: Nifi 1489 (Support for http proxy) + Nifi 1495 ...

Posted by apiri <gi...@git.apache.org>.
Github user apiri commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/213#discussion_r53849994
  
    --- Diff: nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/kinesis/firehose/ITPutKinesisFirehose.java ---
    @@ -0,0 +1,149 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.aws.kinesis.firehose;
    +
    +import java.util.List;
    +
    +import org.apache.nifi.processors.aws.s3.FetchS3Object;
    +import org.apache.nifi.util.MockFlowFile;
    +import org.apache.nifi.util.TestRunner;
    +import org.apache.nifi.util.TestRunners;
    +import org.junit.After;
    +import org.junit.Before;
    +import org.junit.Ignore;
    +import org.junit.Test;
    +
    +/**
    + * This test contains both unit and integration test (integration tests are ignored by default)
    + */
    +public class ITPutKinesisFirehose {
    +
    +    private TestRunner runner;
    +    protected final static String CREDENTIALS_FILE = System.getProperty("user.home") + "/aws-credentials.properties";
    +
    +    @Before
    +    public void setUp() throws Exception {
    +        runner = TestRunners.newTestRunner(PutKinesisFirehose.class);
    +        runner.setProperty(PutKinesisFirehose.ACCESS_KEY, "abcd");
    +        runner.setProperty(PutKinesisFirehose.SECRET_KEY, "secret key");
    +        runner.setProperty(PutKinesisFirehose.KINESIS_FIREHOSE_DELIVERY_STREAM_NAME, "deliveryName");
    +        runner.assertValid();
    +    }
    +
    +    @After
    +    public void tearDown() throws Exception {
    +        runner = null;
    +    }
    +
    +    @Test
    +    public void testCustomValidateBatchSize1Valid() {
    +        runner.setProperty(PutKinesisFirehose.BATCH_SIZE, "1");
    +        runner.assertValid();
    +    }
    +
    +    @Test
    +    public void testCustomValidateBatchSize500Valid() {
    +        runner.setProperty(PutKinesisFirehose.BATCH_SIZE, "500");
    +        runner.assertValid();
    +    }
    +    @Test
    +    public void testCustomValidateBatchSize501InValid() {
    +        runner.setProperty(PutKinesisFirehose.BATCH_SIZE, "501");
    +        runner.assertNotValid();
    +    }
    +
    +    @Test
    +    public void testCustomValidateBufferSize1Valid() {
    +        runner.setProperty(PutKinesisFirehose.MAX_BUFFER_SIZE, "1");
    +        runner.assertValid();
    +    }
    +
    +    @Test
    +    public void testCustomValidateBufferSize128Valid() {
    +        runner.setProperty(PutKinesisFirehose.MAX_BUFFER_SIZE, "128");
    +        runner.assertValid();
    +    }
    +    @Test
    +    public void testCustomValidateBufferSize129InValid() {
    +        runner.setProperty(PutKinesisFirehose.MAX_BUFFER_SIZE, "129");
    +        runner.assertNotValid();
    +    }
    +
    +    @Test
    +    public void testCustomValidateBufferInterval900Valid() {
    +        runner.setProperty(PutKinesisFirehose.MAX_BUFFER_INTERVAL, "900");
    +        runner.assertValid();
    +    }
    +
    +    @Test
    +    public void testCustomValidateBufferInterval60Valid() {
    +        runner.setProperty(PutKinesisFirehose.MAX_BUFFER_INTERVAL, "60");
    +        runner.assertValid();
    +    }
    +
    +    @Test
    +    public void testCustomValidateBufferInterval901InValid() {
    +        runner.setProperty(PutKinesisFirehose.MAX_BUFFER_INTERVAL, "901");
    +        runner.assertNotValid();
    +    }
    +
    +    @Test
    +    public void testCustomValidateBufferInterval59InValid() {
    +        runner.setProperty(PutKinesisFirehose.MAX_BUFFER_INTERVAL, "59");
    +        runner.assertNotValid();
    +    }
    +
    +    /**
    +     * Comment out ignore for integration tests (requires creds files)
    +     */
    +    @Test
    +    @Ignore
    --- End diff --
    
    The profile to run ITs is not enabled by default and this Ignore is unneeded.  Would prefer to have removed so that no code changes are needed to run them.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: Nifi 1495 - AWS Kinesis Firehose

Posted by mans2singh <gi...@git.apache.org>.
Github user mans2singh commented on the pull request:

    https://github.com/apache/nifi/pull/213#issuecomment-188062568
  
    Corrected title of pull request.  Please let me know if there is anything else required.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: Nifi 1495 - AWS Kinesis Firehose

Posted by apiri <gi...@git.apache.org>.
Github user apiri commented on the pull request:

    https://github.com/apache/nifi/pull/213#issuecomment-196956296
  
    @mans2singh Overall, the changes look good and appreciate the inclusion of some additional tests.  Going to do some functional testing but think we can get this merged in.  There are, unfortunately, a lot of conflicts, so for some of the minor points mentioned in the review, would you also be able to please get this rebased so we are good to go for a merge?
    
    Thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: Nifi 1489 (Support for http proxy) + Nifi 1495 ...

Posted by mans2singh <gi...@git.apache.org>.
Github user mans2singh commented on the pull request:

    https://github.com/apache/nifi/pull/213#issuecomment-186941406
  
    @apiri - Do you have any other comments/suggestions for me ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: Nifi 1489 (Support for http proxy) + Nifi 1495 ...

Posted by mans2singh <gi...@git.apache.org>.
Github user mans2singh commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/213#discussion_r53893312
  
    --- Diff: nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/kinesis/firehose/ITPutKinesisFirehose.java ---
    @@ -0,0 +1,149 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.aws.kinesis.firehose;
    +
    +import java.util.List;
    +
    +import org.apache.nifi.processors.aws.s3.FetchS3Object;
    +import org.apache.nifi.util.MockFlowFile;
    +import org.apache.nifi.util.TestRunner;
    +import org.apache.nifi.util.TestRunners;
    +import org.junit.After;
    +import org.junit.Before;
    +import org.junit.Ignore;
    +import org.junit.Test;
    +
    +/**
    + * This test contains both unit and integration test (integration tests are ignored by default)
    + */
    +public class ITPutKinesisFirehose {
    +
    +    private TestRunner runner;
    +    protected final static String CREDENTIALS_FILE = System.getProperty("user.home") + "/aws-credentials.properties";
    +
    +    @Before
    +    public void setUp() throws Exception {
    +        runner = TestRunners.newTestRunner(PutKinesisFirehose.class);
    +        runner.setProperty(PutKinesisFirehose.ACCESS_KEY, "abcd");
    +        runner.setProperty(PutKinesisFirehose.SECRET_KEY, "secret key");
    +        runner.setProperty(PutKinesisFirehose.KINESIS_FIREHOSE_DELIVERY_STREAM_NAME, "deliveryName");
    +        runner.assertValid();
    +    }
    +
    +    @After
    +    public void tearDown() throws Exception {
    +        runner = null;
    +    }
    +
    +    @Test
    +    public void testCustomValidateBatchSize1Valid() {
    +        runner.setProperty(PutKinesisFirehose.BATCH_SIZE, "1");
    +        runner.assertValid();
    +    }
    +
    +    @Test
    +    public void testCustomValidateBatchSize500Valid() {
    +        runner.setProperty(PutKinesisFirehose.BATCH_SIZE, "500");
    +        runner.assertValid();
    +    }
    +    @Test
    +    public void testCustomValidateBatchSize501InValid() {
    +        runner.setProperty(PutKinesisFirehose.BATCH_SIZE, "501");
    +        runner.assertNotValid();
    +    }
    +
    +    @Test
    +    public void testCustomValidateBufferSize1Valid() {
    +        runner.setProperty(PutKinesisFirehose.MAX_BUFFER_SIZE, "1");
    +        runner.assertValid();
    +    }
    +
    +    @Test
    +    public void testCustomValidateBufferSize128Valid() {
    +        runner.setProperty(PutKinesisFirehose.MAX_BUFFER_SIZE, "128");
    +        runner.assertValid();
    +    }
    +    @Test
    +    public void testCustomValidateBufferSize129InValid() {
    +        runner.setProperty(PutKinesisFirehose.MAX_BUFFER_SIZE, "129");
    +        runner.assertNotValid();
    +    }
    +
    +    @Test
    +    public void testCustomValidateBufferInterval900Valid() {
    +        runner.setProperty(PutKinesisFirehose.MAX_BUFFER_INTERVAL, "900");
    +        runner.assertValid();
    +    }
    +
    +    @Test
    +    public void testCustomValidateBufferInterval60Valid() {
    +        runner.setProperty(PutKinesisFirehose.MAX_BUFFER_INTERVAL, "60");
    +        runner.assertValid();
    +    }
    +
    +    @Test
    +    public void testCustomValidateBufferInterval901InValid() {
    +        runner.setProperty(PutKinesisFirehose.MAX_BUFFER_INTERVAL, "901");
    +        runner.assertNotValid();
    +    }
    +
    +    @Test
    +    public void testCustomValidateBufferInterval59InValid() {
    +        runner.setProperty(PutKinesisFirehose.MAX_BUFFER_INTERVAL, "59");
    +        runner.assertNotValid();
    +    }
    +
    +    /**
    +     * Comment out ignore for integration tests (requires creds files)
    +     */
    +    @Test
    +    @Ignore
    --- End diff --
    
    Removed ignore annotation for IT tests


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: Nifi 1489 (Support for http proxy) + Nifi 1495 ...

Posted by mans2singh <gi...@git.apache.org>.
Github user mans2singh commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/213#discussion_r53893190
  
    --- Diff: nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/firehose/AbstractKinesisFirehoseProcessor.java ---
    @@ -0,0 +1,89 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.aws.kinesis.firehose;
    +
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.processors.aws.AbstractAWSCredentialsProviderProcessor;
    +
    +import com.amazonaws.ClientConfiguration;
    +import com.amazonaws.auth.AWSCredentials;
    +import com.amazonaws.auth.AWSCredentialsProvider;
    +import com.amazonaws.services.kinesisfirehose.AmazonKinesisFirehoseClient;
    +
    +/**
    + * This class provides processor the base class for kinesis firehose
    + */
    +public abstract class AbstractKinesisFirehoseProcessor extends AbstractAWSCredentialsProviderProcessor<AmazonKinesisFirehoseClient> {
    +
    +    public static final PropertyDescriptor KINESIS_FIREHOSE_DELIVERY_STREAM_NAME = new PropertyDescriptor.Builder()
    +            .name("Amazon Kinesis Firehose Delivery Stream Name")
    +            .description("The name of kinesis firehose delivery stream")
    +            .expressionLanguageSupported(false)
    +            .required(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor MAX_BUFFER_INTERVAL = new PropertyDescriptor.Builder()
    +            .name("Max Buffer Interval")
    +            .description("Buffering interval for messages (between 60 and 900 seconds).")
    +            .defaultValue("60")
    +            .required(false)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    --- End diff --
    
    @apiri - The max buffer size and interval are configuration properties and as you pointed out correctly not used in client.  I've removed them and also have updated validators to use createLongValidator


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: Nifi 1489 (Support for http proxy) + Nifi 1495 ...

Posted by mans2singh <gi...@git.apache.org>.
Github user mans2singh commented on the pull request:

    https://github.com/apache/nifi/pull/213#issuecomment-185913781
  
    Just wanted to check if there is any other feedback/recommendation on Kinesis Firehose put processor.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: Nifi 1489 (Support for http proxy) + Nifi 1495 ...

Posted by mans2singh <gi...@git.apache.org>.
Github user mans2singh commented on the pull request:

    https://github.com/apache/nifi/pull/213#issuecomment-184885151
  
    @apiri I've rebased the forked branch.  Thanks  


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: Nifi 1489 (Support for http proxy) + Nifi 1495 ...

Posted by apiri <gi...@git.apache.org>.
Github user apiri commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/213#discussion_r53849876
  
    --- Diff: nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/kinesis/firehose/ITPutKinesisFirehose.java ---
    @@ -0,0 +1,149 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.aws.kinesis.firehose;
    +
    +import java.util.List;
    +
    +import org.apache.nifi.processors.aws.s3.FetchS3Object;
    +import org.apache.nifi.util.MockFlowFile;
    +import org.apache.nifi.util.TestRunner;
    +import org.apache.nifi.util.TestRunners;
    +import org.junit.After;
    +import org.junit.Before;
    +import org.junit.Ignore;
    +import org.junit.Test;
    +
    +/**
    + * This test contains both unit and integration test (integration tests are ignored by default)
    + */
    +public class ITPutKinesisFirehose {
    --- End diff --
    
    customValidate tests and similar, if still needed or desired, should go into a standard TestPutKinesisFirehose class so that they are run on each build


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: Nifi 1495 - AWS Kinesis Firehose

Posted by apiri <gi...@git.apache.org>.
Github user apiri commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/213#discussion_r56201575
  
    --- Diff: nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/firehose/PutKinesisFirehose.java ---
    @@ -0,0 +1,180 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.aws.kinesis.firehose;
    +
    +import java.io.ByteArrayOutputStream;
    +import java.nio.ByteBuffer;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +
    +import org.apache.commons.lang3.StringUtils;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
    +import org.apache.nifi.annotation.behavior.SupportsBatching;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import com.amazonaws.services.kinesisfirehose.AmazonKinesisFirehoseClient;
    +import com.amazonaws.services.kinesisfirehose.model.PutRecordBatchRequest;
    +import com.amazonaws.services.kinesisfirehose.model.PutRecordBatchResponseEntry;
    +import com.amazonaws.services.kinesisfirehose.model.PutRecordBatchResult;
    +import com.amazonaws.services.kinesisfirehose.model.Record;
    +
    +@SupportsBatching
    +@InputRequirement(Requirement.INPUT_REQUIRED)
    +@Tags({"amazon", "aws", "firehose", "kinesis", "put", "stream"})
    +@CapabilityDescription("Sends the contents to a specified Amazon Kinesis Firehose. "
    +    + "In order to send data to firehose, the firehose delivery stream name has to be specified.")
    +@WritesAttributes({
    +    @WritesAttribute(attribute = "aws.kinesis.firehose.error.message", description = "Error message on posting message to AWS Kinesis Firehose"),
    +    @WritesAttribute(attribute = "aws.kinesis.firehose.error.code", description = "Error code for the message when posting to AWS Kinesis Firehose"),
    +    @WritesAttribute(attribute = "aws.kinesis.firehose.record.id", description = "Record id of the message posted to Kinesis Firehose")})
    +public class PutKinesisFirehose extends AbstractKinesisFirehoseProcessor {
    +
    +    /**
    +     * Kinesis put record response error message
    +     */
    +    public static final String AWS_KINESIS_FIREHOSE_ERROR_MESSAGE = "aws.kinesis.firehose.error.message";
    +
    +    /**
    +     * Kinesis put record response error code
    +     */
    +    public static final String AWS_KINESIS_FIREHOSE_ERROR_CODE = "aws.kinesis.firehose.error.code";
    +
    +    /**
    +     * Kinesis put record response record id
    +     */
    +    public static final String AWS_KINESIS_FIREHOSE_RECORD_ID = "aws.kinesis.firehose.record.id";
    +
    +    public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
    +            Arrays.asList(KINESIS_FIREHOSE_DELIVERY_STREAM_NAME, BATCH_SIZE, MAX_MESSAGE_BUFFER_SIZE_MB, REGION, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, TIMEOUT,
    +                  PROXY_HOST,PROXY_HOST_PORT));
    +
    +    /**
    +     * Max buffer size 1000kb
    +     */
    +    public static final int MAX_MESSAGE_SIZE = 1000 * 1024;
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return properties;
    +    }
    +
    +    @Override
    +    public void onTrigger(final ProcessContext context, final ProcessSession session) {
    +        FlowFile flowFileCandidate = session.get();
    +        if ( flowFileCandidate == null )
    +            return;
    +
    +        long currentBufferSizeBytes = flowFileCandidate.getSize();
    +
    +        final int batchSize = context.getProperty(BATCH_SIZE).asInteger();
    +        final int maxBufferSizeBytes = context.getProperty(MAX_MESSAGE_BUFFER_SIZE_MB).asInteger() * 1024 * 1000;
    +
    +        // Get max batch size messages with size limit defined by maxBufferSizeBytes
    +        List<FlowFile> flowFiles = new ArrayList<FlowFile>(batchSize);
    +        flowFiles.add(flowFileCandidate);
    +        for (int i = 1; (i < batchSize) && (currentBufferSizeBytes <= maxBufferSizeBytes); i++) {
    +            flowFileCandidate = session.get();
    +            if ( flowFileCandidate == null )
    +                break;
    +            currentBufferSizeBytes += flowFileCandidate.getSize();
    +            flowFiles.add(flowFileCandidate);
    +        }
    +
    +        final String firehoseStreamName = context.getProperty(KINESIS_FIREHOSE_DELIVERY_STREAM_NAME).getValue();
    +        final AmazonKinesisFirehoseClient client = getClient();
    +
    +        try {
    +            List<Record> records = new ArrayList<>();
    +
    +            List<FlowFile> failedFlowFiles = new ArrayList<>();
    +            List<FlowFile> successfulFlowFiles = new ArrayList<>();
    +
    +            // Prepare batch of records
    +            for (int i = 0; i < flowFiles.size(); i++) {
    +                FlowFile flowFile = flowFiles.get(i);
    +
    +                // If flow file too large send it to failure
    --- End diff --
    
    This condition check should still happen before the above buffering considerations.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: Nifi 1495 - AWS Kinesis Firehose

Posted by mans2singh <gi...@git.apache.org>.
Github user mans2singh commented on the pull request:

    https://github.com/apache/nifi/pull/213#issuecomment-195172976
  
    @apiri Your recommendation for using buffer size is great. So, should we add another property - total batch bytes, and then send them in batches of that size ? If you have any other suggestion/pointers on how this can be implemented, please let me know and I will work on it.  Thanks again for your time.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: Nifi 1495 - AWS Kinesis Firehose

Posted by apiri <gi...@git.apache.org>.
Github user apiri commented on the pull request:

    https://github.com/apache/nifi/pull/213#issuecomment-195410447
  
    Nope, just send it off to Kinesis and then transfer the files to the appropriate relationship afterward.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: Nifi 1495 - AWS Kinesis Firehose

Posted by apiri <gi...@git.apache.org>.
Github user apiri commented on the pull request:

    https://github.com/apache/nifi/pull/213#issuecomment-197000303
  
    Looking good going to AWS endpoints and services.  Good stuff.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: Nifi 1495 - AWS Kinesis Firehose

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/nifi/pull/213


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: Nifi 1489 (Support for http proxy) + Nifi 1495 ...

Posted by mans2singh <gi...@git.apache.org>.
Github user mans2singh commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/213#discussion_r53893255
  
    --- Diff: nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/firehose/PutKinesisFirehose.java ---
    @@ -0,0 +1,184 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.aws.kinesis.firehose;
    +
    +import java.io.ByteArrayOutputStream;
    +import java.nio.ByteBuffer;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +
    +import org.apache.commons.lang3.StringUtils;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
    +import org.apache.nifi.annotation.behavior.SupportsBatching;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.ValidationContext;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import com.amazonaws.services.kinesisfirehose.AmazonKinesisFirehoseClient;
    +import com.amazonaws.services.kinesisfirehose.model.PutRecordBatchRequest;
    +import com.amazonaws.services.kinesisfirehose.model.PutRecordBatchResponseEntry;
    +import com.amazonaws.services.kinesisfirehose.model.PutRecordBatchResult;
    +import com.amazonaws.services.kinesisfirehose.model.Record;
    +
    +@SupportsBatching
    +@InputRequirement(Requirement.INPUT_REQUIRED)
    +@Tags({"amazon", "aws", "firehose", "kinesis", "put", "stream"})
    +@CapabilityDescription("Sends the contents to a specified Amazon Kinesis Firehose. "
    +    + "In order to send data to firehose, the firehose delivery stream name has to be specified.")
    +@WritesAttributes({
    +    @WritesAttribute(attribute = "aws.kinesis.firehose.error.message", description = "Error message on posting message to AWS Kinesis Firehose"),
    +    @WritesAttribute(attribute = "aws.kinesis.firehose.error.code", description = "Error code for the message when posting to AWS Kinesis Firehose"),
    +    @WritesAttribute(attribute = "aws.kinesis.firehose.record.id", description = "Record id of the message posted to Kinesis Firehose")})
    +public class PutKinesisFirehose extends AbstractKinesisFirehoseProcessor {
    +
    +    /**
    +     * Kinesis put record response error message
    +     */
    +    public static final String AWS_KINESIS_FIREHOSE_ERROR_MESSAGE = "aws.kinesis.firehose.error.message";
    +
    +    /**
    +     * Kinesis put record response error code
    +     */
    +    public static final String AWS_KINESIS_FIREHOSE_ERROR_CODE = "aws.kinesis.firehose.error.code";
    +
    +    /**
    +     * Kinesis put record response record id
    +     */
    +    public static final String AWS_KINESIS_FIREHOSE_RECORD_ID = "aws.kinesis.firehose.record.id";
    +
    +    public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
    +            Arrays.asList(KINESIS_FIREHOSE_DELIVERY_STREAM_NAME, MAX_BUFFER_INTERVAL,
    +                  MAX_BUFFER_SIZE, BATCH_SIZE, REGION, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, TIMEOUT,
    --- End diff --
    
    I've removed them (these are configuration params which I mistakenly left in the client code).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: Nifi 1495 - AWS Kinesis Firehose

Posted by apiri <gi...@git.apache.org>.
Github user apiri commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/213#discussion_r56212428
  
    --- Diff: nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/firehose/AbstractKinesisFirehoseProcessor.java ---
    @@ -0,0 +1,81 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.aws.kinesis.firehose;
    +
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.processors.aws.AbstractAWSCredentialsProviderProcessor;
    +
    +import com.amazonaws.ClientConfiguration;
    +import com.amazonaws.auth.AWSCredentials;
    +import com.amazonaws.auth.AWSCredentialsProvider;
    +import com.amazonaws.services.kinesisfirehose.AmazonKinesisFirehoseClient;
    +
    +/**
    + * This class provides processor the base class for kinesis firehose
    + */
    +public abstract class AbstractKinesisFirehoseProcessor extends AbstractAWSCredentialsProviderProcessor<AmazonKinesisFirehoseClient> {
    +
    +    public static final PropertyDescriptor KINESIS_FIREHOSE_DELIVERY_STREAM_NAME = new PropertyDescriptor.Builder()
    +            .name("Amazon Kinesis Firehose Delivery Stream Name")
    +            .description("The name of kinesis firehose delivery stream")
    +            .expressionLanguageSupported(false)
    +            .required(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
    +            .name("Batch Size")
    +            .description("Batch size for messages (1-500).")
    +            .defaultValue("250")
    +            .required(false)
    +            .addValidator(StandardValidators.createLongValidator(1, 500, true))
    +            .sensitive(false)
    +            .build();
    +
    +    public static final PropertyDescriptor MAX_MESSAGE_BUFFER_SIZE_MB = new PropertyDescriptor.Builder()
    +            .name("Max message buffer size (MB)")
    +            .description("Max message buffer size (1-50) MB")
    +            .defaultValue("1")
    +            .required(false)
    +            .addValidator(StandardValidators.createLongValidator(1, 50, true))
    --- End diff --
    
    Likely want to prefer utilization of the DATA_SIZE_VALIDATOR.  Not sure we need to strictly cap this as we are guiding users toward an appropriate path and if they have conditions where they wish to go above the maximum (as well as supporting hardware) that should be fine.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: Nifi 1495 - AWS Kinesis Firehose

Posted by apiri <gi...@git.apache.org>.
Github user apiri commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/213#discussion_r56212500
  
    --- Diff: nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/firehose/PutKinesisFirehose.java ---
    @@ -0,0 +1,180 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.aws.kinesis.firehose;
    +
    +import java.io.ByteArrayOutputStream;
    +import java.nio.ByteBuffer;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +
    +import org.apache.commons.lang3.StringUtils;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
    +import org.apache.nifi.annotation.behavior.SupportsBatching;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import com.amazonaws.services.kinesisfirehose.AmazonKinesisFirehoseClient;
    +import com.amazonaws.services.kinesisfirehose.model.PutRecordBatchRequest;
    +import com.amazonaws.services.kinesisfirehose.model.PutRecordBatchResponseEntry;
    +import com.amazonaws.services.kinesisfirehose.model.PutRecordBatchResult;
    +import com.amazonaws.services.kinesisfirehose.model.Record;
    +
    +@SupportsBatching
    +@InputRequirement(Requirement.INPUT_REQUIRED)
    +@Tags({"amazon", "aws", "firehose", "kinesis", "put", "stream"})
    +@CapabilityDescription("Sends the contents to a specified Amazon Kinesis Firehose. "
    +    + "In order to send data to firehose, the firehose delivery stream name has to be specified.")
    +@WritesAttributes({
    +    @WritesAttribute(attribute = "aws.kinesis.firehose.error.message", description = "Error message on posting message to AWS Kinesis Firehose"),
    +    @WritesAttribute(attribute = "aws.kinesis.firehose.error.code", description = "Error code for the message when posting to AWS Kinesis Firehose"),
    +    @WritesAttribute(attribute = "aws.kinesis.firehose.record.id", description = "Record id of the message posted to Kinesis Firehose")})
    +public class PutKinesisFirehose extends AbstractKinesisFirehoseProcessor {
    +
    +    /**
    +     * Kinesis put record response error message
    +     */
    +    public static final String AWS_KINESIS_FIREHOSE_ERROR_MESSAGE = "aws.kinesis.firehose.error.message";
    +
    +    /**
    +     * Kinesis put record response error code
    +     */
    +    public static final String AWS_KINESIS_FIREHOSE_ERROR_CODE = "aws.kinesis.firehose.error.code";
    +
    +    /**
    +     * Kinesis put record response record id
    +     */
    +    public static final String AWS_KINESIS_FIREHOSE_RECORD_ID = "aws.kinesis.firehose.record.id";
    +
    +    public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
    +            Arrays.asList(KINESIS_FIREHOSE_DELIVERY_STREAM_NAME, BATCH_SIZE, MAX_MESSAGE_BUFFER_SIZE_MB, REGION, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, TIMEOUT,
    +                  PROXY_HOST,PROXY_HOST_PORT));
    +
    +    /**
    +     * Max buffer size 1000kb
    --- End diff --
    
    Now that we have a buffer, should likely call this message size


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: Nifi 1495 - AWS Kinesis Firehose

Posted by mans2singh <gi...@git.apache.org>.
Github user mans2singh commented on the pull request:

    https://github.com/apache/nifi/pull/213#issuecomment-195300721
  
    @apiri - Just wanted to clarify - We will grab a batch of flow files and check that the flow files combined size threshold is reached.  Then we send the batch to the successful relation, and if not we send the flow files back to the work queue with the transfer method you mentioned.  This might cause additional delays if the threshold set by the user is not reached for some time because the flow files are too small or the rate at which flow files are being input is low.
    No worries on the time - I know you guys are busy and I really appreciate your time/advice in helping me work on this project.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: Nifi 1489 (Support for http proxy) + Nifi 1495 ...

Posted by apiri <gi...@git.apache.org>.
Github user apiri commented on the pull request:

    https://github.com/apache/nifi/pull/213#issuecomment-186944932
  
    Hi Mans,
    
    Been a bit busy, will certainly get to the full review this week.  Thanks for rebasing.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: Nifi 1489 (Support for http proxy) + Nifi 1495 ...

Posted by mans2singh <gi...@git.apache.org>.
Github user mans2singh commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/213#discussion_r53893277
  
    --- Diff: nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/firehose/PutKinesisFirehose.java ---
    @@ -0,0 +1,184 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.aws.kinesis.firehose;
    +
    +import java.io.ByteArrayOutputStream;
    +import java.nio.ByteBuffer;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +
    +import org.apache.commons.lang3.StringUtils;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
    +import org.apache.nifi.annotation.behavior.SupportsBatching;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.ValidationContext;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import com.amazonaws.services.kinesisfirehose.AmazonKinesisFirehoseClient;
    +import com.amazonaws.services.kinesisfirehose.model.PutRecordBatchRequest;
    +import com.amazonaws.services.kinesisfirehose.model.PutRecordBatchResponseEntry;
    +import com.amazonaws.services.kinesisfirehose.model.PutRecordBatchResult;
    +import com.amazonaws.services.kinesisfirehose.model.Record;
    +
    +@SupportsBatching
    +@InputRequirement(Requirement.INPUT_REQUIRED)
    +@Tags({"amazon", "aws", "firehose", "kinesis", "put", "stream"})
    +@CapabilityDescription("Sends the contents to a specified Amazon Kinesis Firehose. "
    +    + "In order to send data to firehose, the firehose delivery stream name has to be specified.")
    +@WritesAttributes({
    +    @WritesAttribute(attribute = "aws.kinesis.firehose.error.message", description = "Error message on posting message to AWS Kinesis Firehose"),
    +    @WritesAttribute(attribute = "aws.kinesis.firehose.error.code", description = "Error code for the message when posting to AWS Kinesis Firehose"),
    +    @WritesAttribute(attribute = "aws.kinesis.firehose.record.id", description = "Record id of the message posted to Kinesis Firehose")})
    +public class PutKinesisFirehose extends AbstractKinesisFirehoseProcessor {
    +
    +    /**
    +     * Kinesis put record response error message
    +     */
    +    public static final String AWS_KINESIS_FIREHOSE_ERROR_MESSAGE = "aws.kinesis.firehose.error.message";
    +
    +    /**
    +     * Kinesis put record response error code
    +     */
    +    public static final String AWS_KINESIS_FIREHOSE_ERROR_CODE = "aws.kinesis.firehose.error.code";
    +
    +    /**
    +     * Kinesis put record response record id
    +     */
    +    public static final String AWS_KINESIS_FIREHOSE_RECORD_ID = "aws.kinesis.firehose.record.id";
    +
    +    public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
    +            Arrays.asList(KINESIS_FIREHOSE_DELIVERY_STREAM_NAME, MAX_BUFFER_INTERVAL,
    +                  MAX_BUFFER_SIZE, BATCH_SIZE, REGION, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, TIMEOUT,
    +                  PROXY_HOST,PROXY_HOST_PORT));
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return properties;
    +    }
    +
    +    @Override
    +    protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
    +        final List<ValidationResult> problems = new ArrayList<>(super.customValidate(validationContext));
    +        final boolean batchSizeSet = validationContext.getProperty(BATCH_SIZE).isSet();
    +
    +        if ( batchSizeSet) {
    +           int batchSize = validationContext.getProperty(BATCH_SIZE).asInteger();
    +           if ( batchSize < 1 || batchSize > 500 ) {
    +                problems.add(new ValidationResult.Builder().input("Batch Size").valid(false).explanation("Batch size must be between 1 and 500 but was " + batchSize).build());
    +           }
    +        }
    +
    +        final boolean maxBufferIntervalIsSet = validationContext.getProperty(MAX_BUFFER_INTERVAL).isSet();
    +        if ( maxBufferIntervalIsSet) {
    +           int maxBufferInterval = validationContext.getProperty(MAX_BUFFER_INTERVAL).asInteger();
    +           if ( maxBufferInterval < 60 || maxBufferInterval > 900 ) {
    +                problems.add(new ValidationResult.Builder().input("Max Buffer Interval").valid(false)
    +                   .explanation("Max Buffer Interval must be between 60 and 900 seconds but was " + maxBufferInterval).build());
    +           }
    +        }
    +
    +        final boolean maxBufferSizeIsSet = validationContext.getProperty(MAX_BUFFER_SIZE).isSet();
    +        if ( maxBufferSizeIsSet) {
    +           int maxBufferSize = validationContext.getProperty(MAX_BUFFER_SIZE).asInteger();
    +           if ( maxBufferSize < 1 || maxBufferSize > 128 ) {
    +                problems.add(new ValidationResult.Builder().input("Max Buffer Size").valid(false).explanation("Max Buffer Size must be between 1 and 128 (mb) but was " + maxBufferSize).build());
    +           }
    +        }
    +        return problems;
    +    }
    +
    +    @Override
    +    public void onTrigger(final ProcessContext context, final ProcessSession session) {
    +        final int batchSize = context.getProperty(BATCH_SIZE).asInteger();
    +
    +        List<FlowFile> flowFiles = session.get(batchSize);
    +        if (flowFiles == null || flowFiles.size() == 0) {
    --- End diff --
    
    Corrected per your recommendation.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: Nifi 1489 (Support for http proxy) + Nifi 1495 ...

Posted by apiri <gi...@git.apache.org>.
Github user apiri commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/213#discussion_r53845261
  
    --- Diff: nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/firehose/PutKinesisFirehose.java ---
    @@ -0,0 +1,184 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.aws.kinesis.firehose;
    +
    +import java.io.ByteArrayOutputStream;
    +import java.nio.ByteBuffer;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +
    +import org.apache.commons.lang3.StringUtils;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
    +import org.apache.nifi.annotation.behavior.SupportsBatching;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.ValidationContext;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import com.amazonaws.services.kinesisfirehose.AmazonKinesisFirehoseClient;
    +import com.amazonaws.services.kinesisfirehose.model.PutRecordBatchRequest;
    +import com.amazonaws.services.kinesisfirehose.model.PutRecordBatchResponseEntry;
    +import com.amazonaws.services.kinesisfirehose.model.PutRecordBatchResult;
    +import com.amazonaws.services.kinesisfirehose.model.Record;
    +
    +@SupportsBatching
    +@InputRequirement(Requirement.INPUT_REQUIRED)
    +@Tags({"amazon", "aws", "firehose", "kinesis", "put", "stream"})
    +@CapabilityDescription("Sends the contents to a specified Amazon Kinesis Firehose. "
    +    + "In order to send data to firehose, the firehose delivery stream name has to be specified.")
    +@WritesAttributes({
    +    @WritesAttribute(attribute = "aws.kinesis.firehose.error.message", description = "Error message on posting message to AWS Kinesis Firehose"),
    +    @WritesAttribute(attribute = "aws.kinesis.firehose.error.code", description = "Error code for the message when posting to AWS Kinesis Firehose"),
    +    @WritesAttribute(attribute = "aws.kinesis.firehose.record.id", description = "Record id of the message posted to Kinesis Firehose")})
    +public class PutKinesisFirehose extends AbstractKinesisFirehoseProcessor {
    +
    +    /**
    +     * Kinesis put record response error message
    +     */
    +    public static final String AWS_KINESIS_FIREHOSE_ERROR_MESSAGE = "aws.kinesis.firehose.error.message";
    +
    +    /**
    +     * Kinesis put record response error code
    +     */
    +    public static final String AWS_KINESIS_FIREHOSE_ERROR_CODE = "aws.kinesis.firehose.error.code";
    +
    +    /**
    +     * Kinesis put record response record id
    +     */
    +    public static final String AWS_KINESIS_FIREHOSE_RECORD_ID = "aws.kinesis.firehose.record.id";
    +
    +    public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
    +            Arrays.asList(KINESIS_FIREHOSE_DELIVERY_STREAM_NAME, MAX_BUFFER_INTERVAL,
    +                  MAX_BUFFER_SIZE, BATCH_SIZE, REGION, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, TIMEOUT,
    --- End diff --
    
    If BUFFER_INTERVAL and BUFFER_SIZE are to go unused, they will also need to be removed from here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---