You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jw...@apache.org on 2016/10/12 03:42:52 UTC

nifi git commit: Nifi-1540 - AWS Kinesis Streams put processor

Repository: nifi
Updated Branches:
  refs/heads/master 6d5f4777c -> af2717932


Nifi-1540 - AWS Kinesis Streams put processor

This closes #239.

Signed-off-by: James Wing <jv...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/af271793
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/af271793
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/af271793

Branch: refs/heads/master
Commit: af27179322a29259fb831bbc20f84f70c415ef56
Parents: 6d5f477
Author: mans2singh <ma...@yahoo.com>
Authored: Sun Oct 9 17:42:25 2016 -0700
Committer: James Wing <jv...@gmail.com>
Committed: Tue Oct 11 20:21:29 2016 -0700

----------------------------------------------------------------------
 .../kinesis/AbstractBaseKinesisProcessor.java   |  98 +++++
 .../AbstractKinesisFirehoseProcessor.java       |   6 +-
 .../kinesis/firehose/PutKinesisFirehose.java    |  31 +-
 .../stream/AbstractKinesisStreamProcessor.java  |  65 +++
 .../aws/kinesis/stream/PutKinesisStream.java    | 174 ++++++++
 .../org.apache.nifi.processor.Processor         |   1 +
 .../aws/kinesis/stream/ITPutKinesisStream.java  | 430 +++++++++++++++++++
 .../kinesis/stream/TestPutKinesisStream.java    |  82 ++++
 8 files changed, 855 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/af271793/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/AbstractBaseKinesisProcessor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/AbstractBaseKinesisProcessor.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/AbstractBaseKinesisProcessor.java
new file mode 100644
index 0000000..e559820
--- /dev/null
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/AbstractBaseKinesisProcessor.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.kinesis;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.aws.AbstractAWSCredentialsProviderProcessor;
+
+import com.amazonaws.AmazonWebServiceClient;
+
+/**
+ * This class provides processor the base class for kinesis client
+ */
+public abstract class AbstractBaseKinesisProcessor<ClientType extends AmazonWebServiceClient>
+    extends AbstractAWSCredentialsProviderProcessor<ClientType> {
+
+    /**
+     * Kinesis put record response error message
+     */
+    public static final String AWS_KINESIS_ERROR_MESSAGE = "aws.kinesis.error.message";
+
+    public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
+            .displayName("Message Batch Size")
+            .name("message-batch-size")
+            .description("Batch size for messages (1-500).")
+            .defaultValue("250")
+            .required(false)
+            .addValidator(StandardValidators.createLongValidator(1, 500, true))
+            .sensitive(false)
+            .build();
+
+    public static final PropertyDescriptor MAX_MESSAGE_BUFFER_SIZE_MB = new PropertyDescriptor.Builder()
+            .name("max-message-buffer-size")
+            .displayName("Max message buffer size (MB)")
+            .description("Max message buffer size in Mega-bytes")
+            .defaultValue("1 MB")
+            .required(false)
+            .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+            .sensitive(false)
+            .build();
+
+    /**
+     * Max buffer size 1 MB
+     */
+    public static final int MAX_MESSAGE_SIZE = 1000 * 1024;
+
+    protected FlowFile handleFlowFileTooBig(final ProcessSession session, FlowFile flowFileCandidate,
+            final String streamName, String message) {
+        flowFileCandidate = session.putAttribute(flowFileCandidate, message,
+            "record too big " + flowFileCandidate.getSize() + " max allowed " + MAX_MESSAGE_SIZE );
+        session.transfer(flowFileCandidate, REL_FAILURE);
+        getLogger().error("Failed to publish to kinesis {} records {} because the size was greater than {} bytes",
+            new Object[]{streamName, flowFileCandidate, MAX_MESSAGE_SIZE});
+        return flowFileCandidate;
+    }
+
+    protected List<FlowFile> filterMessagesByMaxSize(final ProcessSession session, final int batchSize, final long maxBufferSizeBytes, final String streamName, String message) {
+        List<FlowFile> flowFiles = new ArrayList<FlowFile>(batchSize);
+
+        long currentBufferSizeBytes = 0;
+
+        for (int i = 0; (i < batchSize) && (currentBufferSizeBytes <= maxBufferSizeBytes); i++) {
+
+            FlowFile flowFileCandidate = session.get();
+            if ( flowFileCandidate == null )
+                break;
+
+            if (flowFileCandidate.getSize() > MAX_MESSAGE_SIZE) {
+                flowFileCandidate = handleFlowFileTooBig(session, flowFileCandidate, streamName, message);
+                continue;
+            }
+
+            currentBufferSizeBytes += flowFileCandidate.getSize();
+
+            flowFiles.add(flowFileCandidate);
+        }
+        return flowFiles;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/af271793/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/firehose/AbstractKinesisFirehoseProcessor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/firehose/AbstractKinesisFirehoseProcessor.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/firehose/AbstractKinesisFirehoseProcessor.java
index ddc6e6c..ca15653 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/firehose/AbstractKinesisFirehoseProcessor.java
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/firehose/AbstractKinesisFirehoseProcessor.java
@@ -19,7 +19,7 @@ package org.apache.nifi.processors.aws.kinesis.firehose;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.util.StandardValidators;
-import org.apache.nifi.processors.aws.AbstractAWSCredentialsProviderProcessor;
+import org.apache.nifi.processors.aws.kinesis.AbstractBaseKinesisProcessor;
 
 import com.amazonaws.ClientConfiguration;
 import com.amazonaws.auth.AWSCredentials;
@@ -29,7 +29,7 @@ import com.amazonaws.services.kinesisfirehose.AmazonKinesisFirehoseClient;
 /**
  * This class provides processor the base class for kinesis firehose
  */
-public abstract class AbstractKinesisFirehoseProcessor extends AbstractAWSCredentialsProviderProcessor<AmazonKinesisFirehoseClient> {
+public abstract class AbstractKinesisFirehoseProcessor extends AbstractBaseKinesisProcessor<AmazonKinesisFirehoseClient> {
 
     public static final PropertyDescriptor KINESIS_FIREHOSE_DELIVERY_STREAM_NAME = new PropertyDescriptor.Builder()
             .name("Amazon Kinesis Firehose Delivery Stream Name")
@@ -68,7 +68,7 @@ public abstract class AbstractKinesisFirehoseProcessor extends AbstractAWSCreden
     }
 
     /**
-     * Create client using AWSCredentials
+     * Create client using AWSCredentails
      *
      * @deprecated use {@link #createClient(ProcessContext, AWSCredentialsProvider, ClientConfiguration)} instead
      */

http://git-wip-us.apache.org/repos/asf/nifi/blob/af271793/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/firehose/PutKinesisFirehose.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/firehose/PutKinesisFirehose.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/firehose/PutKinesisFirehose.java
index f5c3b9f..8abc965 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/firehose/PutKinesisFirehose.java
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/firehose/PutKinesisFirehose.java
@@ -91,25 +91,8 @@ public class PutKinesisFirehose extends AbstractKinesisFirehoseProcessor {
         final long maxBufferSizeBytes = context.getProperty(MAX_MESSAGE_BUFFER_SIZE_MB).asDataSize(DataUnit.B).longValue();
         final String firehoseStreamName = context.getProperty(KINESIS_FIREHOSE_DELIVERY_STREAM_NAME).getValue();
 
-        List<FlowFile> flowFiles = new ArrayList<FlowFile>(batchSize);
-
-        long currentBufferSizeBytes = 0;
-
-        for (int i = 0; (i < batchSize) && (currentBufferSizeBytes <= maxBufferSizeBytes); i++) {
-
-            FlowFile flowFileCandidate = session.get();
-            if ( flowFileCandidate == null )
-                break;
-
-            if (flowFileCandidate.getSize() > MAX_MESSAGE_SIZE) {
-                flowFileCandidate = handleFlowFileTooBig(session, flowFileCandidate, firehoseStreamName);
-                continue;
-            }
-
-            currentBufferSizeBytes += flowFileCandidate.getSize();
-
-            flowFiles.add(flowFileCandidate);
-        }
+        List<FlowFile> flowFiles = filterMessagesByMaxSize(session, batchSize, maxBufferSizeBytes, firehoseStreamName,
+            AWS_KINESIS_FIREHOSE_ERROR_MESSAGE);
 
         final AmazonKinesisFirehoseClient client = getClient();
 
@@ -172,14 +155,4 @@ public class PutKinesisFirehose extends AbstractKinesisFirehoseProcessor {
         }
     }
 
-    protected FlowFile handleFlowFileTooBig(final ProcessSession session, FlowFile flowFileCandidate,
-            final String firehoseStreamName) {
-        flowFileCandidate = session.putAttribute(flowFileCandidate, AWS_KINESIS_FIREHOSE_ERROR_MESSAGE,
-            "record too big " + flowFileCandidate.getSize() + " max allowed " + MAX_MESSAGE_SIZE );
-        session.transfer(flowFileCandidate, REL_FAILURE);
-        getLogger().error("Failed to publish to kinesis firehose {} records {} because the size was greater than {} bytes",
-            new Object[]{firehoseStreamName, flowFileCandidate, MAX_MESSAGE_SIZE});
-        return flowFileCandidate;
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/af271793/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/AbstractKinesisStreamProcessor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/AbstractKinesisStreamProcessor.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/AbstractKinesisStreamProcessor.java
new file mode 100644
index 0000000..b7513af
--- /dev/null
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/AbstractKinesisStreamProcessor.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.kinesis.stream;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.aws.kinesis.AbstractBaseKinesisProcessor;
+
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.auth.AWSCredentials;
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.services.kinesis.AmazonKinesisClient;
+
+/**
+ * This class provides processor the base class for kinesis client
+ */
+public abstract class AbstractKinesisStreamProcessor extends AbstractBaseKinesisProcessor<AmazonKinesisClient> {
+
+    public static final PropertyDescriptor KINESIS_STREAM_NAME = new PropertyDescriptor.Builder()
+            .name("kinesis-stream-name")
+            .displayName("Amazon Kinesis Stream Name")
+            .description("The name of Kinesis Stream")
+            .expressionLanguageSupported(false)
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    /**
+     * Create client using aws credentials provider. This is the preferred way for creating clients
+     */
+    @Override
+    protected AmazonKinesisClient createClient(final ProcessContext context, final AWSCredentialsProvider credentialsProvider, final ClientConfiguration config) {
+        getLogger().info("Creating client using aws credentials provider");
+
+        return new AmazonKinesisClient(credentialsProvider, config);
+    }
+
+    /**
+     * Create client using AWSCredentails
+     *
+     * @deprecated use {@link #createClient(ProcessContext, AWSCredentialsProvider, ClientConfiguration)} instead
+     */
+    @Override
+    protected AmazonKinesisClient createClient(final ProcessContext context, final AWSCredentials credentials, final ClientConfiguration config) {
+        getLogger().info("Creating client using aws credentials");
+
+        return new AmazonKinesisClient(credentials, config);
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/af271793/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/PutKinesisStream.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/PutKinesisStream.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/PutKinesisStream.java
new file mode 100644
index 0000000..cafc82c
--- /dev/null
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/PutKinesisStream.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.kinesis.stream;
+
+import java.io.ByteArrayOutputStream;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.DataUnit;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import com.amazonaws.services.kinesis.AmazonKinesisClient;
+import com.amazonaws.services.kinesis.model.PutRecordsRequest;
+import com.amazonaws.services.kinesis.model.PutRecordsRequestEntry;
+import com.amazonaws.services.kinesis.model.PutRecordsResult;
+import com.amazonaws.services.kinesis.model.PutRecordsResultEntry;
+
+@SupportsBatching
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@Tags({"amazon", "aws", "kinesis", "put", "stream"})
+@CapabilityDescription("Sends the contents to a specified Amazon Kinesis. "
+    + "In order to send data to Kinesis, the stream name has to be specified.")
+@WritesAttributes({
+    @WritesAttribute(attribute = "aws.kinesis.error.message", description = "Error message on posting message to AWS Kinesis"),
+    @WritesAttribute(attribute = "aws.kinesis.error.code", description = "Error code for the message when posting to AWS Kinesis"),
+    @WritesAttribute(attribute = "aws.kinesis.sequence.number", description = "Sequence number for the message when posting to AWS Kinesis"),
+    @WritesAttribute(attribute = "aws.kinesis.shard.id", description = "Shard id of the message posted to AWS Kinesis")})
+public class PutKinesisStream extends AbstractKinesisStreamProcessor {
+
+    /**
+     * Kinesis put record response error code
+     */
+    public static final String AWS_KINESIS_ERROR_CODE = "aws.kinesis.error.code";
+
+    public static final String AWS_KINESIS_SHARD_ID = "aws.kinesis.shard.id";
+
+    public static final String AWS_KINESIS_SEQUENCE_NUMBER = "aws.kinesis.sequence.number";
+
+    public static final PropertyDescriptor KINESIS_PARTITION_KEY = new PropertyDescriptor.Builder()
+        .displayName("Amazon Kinesis Stream Partition Key")
+        .name("amazon-kinesis-stream-partition-key")
+        .description("The partition key attribute.  If it is not set, a random value is used")
+        .expressionLanguageSupported(true)
+        .defaultValue("${kinesis.partition.key}")
+        .required(false)
+        .addValidator(StandardValidators.ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR).build();
+
+    public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
+            Arrays.asList(KINESIS_STREAM_NAME, KINESIS_PARTITION_KEY, BATCH_SIZE, MAX_MESSAGE_BUFFER_SIZE_MB, REGION, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE,
+                AWS_CREDENTIALS_PROVIDER_SERVICE, TIMEOUT, PROXY_HOST,PROXY_HOST_PORT));
+
+    /** A random number generator for cases where partition key is not available */
+    protected Random randomParitionKeyGenerator = new Random();
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return properties;
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession session) {
+
+        final int batchSize = context.getProperty(BATCH_SIZE).asInteger();
+        final long maxBufferSizeBytes = context.getProperty(MAX_MESSAGE_BUFFER_SIZE_MB).asDataSize(DataUnit.B).longValue();
+        final String streamName = context.getProperty(KINESIS_STREAM_NAME).getValue();
+
+        List<FlowFile> flowFiles = filterMessagesByMaxSize(session, batchSize, maxBufferSizeBytes, streamName,
+           AWS_KINESIS_ERROR_MESSAGE);
+
+        final AmazonKinesisClient client = getClient();
+
+        try {
+            List<PutRecordsRequestEntry> records = new ArrayList<>();
+
+            List<FlowFile> failedFlowFiles = new ArrayList<>();
+            List<FlowFile> successfulFlowFiles = new ArrayList<>();
+
+            // Prepare batch of records
+            for (int i = 0; i < flowFiles.size(); i++) {
+                FlowFile flowFile = flowFiles.get(i);
+
+                final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+                session.exportTo(flowFile, baos);
+                PutRecordsRequestEntry record = new PutRecordsRequestEntry().withData(ByteBuffer.wrap(baos.toByteArray()));
+
+                String partitionKey = context.getProperty(PutKinesisStream.KINESIS_PARTITION_KEY)
+                    .evaluateAttributeExpressions(flowFiles.get(i)).getValue();
+
+                if ( ! StringUtils.isBlank(partitionKey) ) {
+                    record.setPartitionKey(partitionKey);
+                } else {
+                    record.setPartitionKey(Integer.toString(randomParitionKeyGenerator.nextInt()));
+                }
+
+                records.add(record);
+            }
+
+            if ( records.size() > 0 ) {
+
+                PutRecordsRequest putRecordRequest = new PutRecordsRequest();
+                putRecordRequest.setStreamName(streamName);
+                putRecordRequest.setRecords(records);
+                PutRecordsResult results = client.putRecords(putRecordRequest);
+
+                List<PutRecordsResultEntry> responseEntries = results.getRecords();
+                for (int i = 0; i < responseEntries.size(); i++ ) {
+                    PutRecordsResultEntry entry = responseEntries.get(i);
+                    FlowFile flowFile = flowFiles.get(i);
+
+                    Map<String,String> attributes = new HashMap<>();
+                    attributes.put(AWS_KINESIS_SHARD_ID, entry.getShardId());
+                    attributes.put(AWS_KINESIS_SEQUENCE_NUMBER, entry.getSequenceNumber());
+
+                    if ( ! StringUtils.isBlank(entry.getErrorCode()) ) {
+                        attributes.put(AWS_KINESIS_ERROR_CODE, entry.getErrorCode());
+                        attributes.put(AWS_KINESIS_ERROR_MESSAGE, entry.getErrorMessage());
+                        flowFile = session.putAllAttributes(flowFile, attributes);
+                        failedFlowFiles.add(flowFile);
+                    } else {
+                        flowFile = session.putAllAttributes(flowFile, attributes);
+                        successfulFlowFiles.add(flowFile);
+                    }
+                }
+                if ( failedFlowFiles.size() > 0 ) {
+                    session.transfer(failedFlowFiles, REL_FAILURE);
+                    getLogger().error("Failed to publish to kinesis {} records {}", new Object[]{streamName, failedFlowFiles});
+                }
+                if ( successfulFlowFiles.size() > 0 ) {
+                    session.transfer(successfulFlowFiles, REL_SUCCESS);
+                    getLogger().debug("Successfully published to kinesis {} records {}", new Object[]{streamName, successfulFlowFiles});
+                }
+                records.clear();
+            }
+
+        } catch (final Exception exception) {
+            getLogger().error("Failed to publish due to exception {} to kinesis {} flowfiles {} ", new Object[]{exception, streamName, flowFiles});
+            session.transfer(flowFiles, REL_FAILURE);
+            context.yield();
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/af271793/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index d8c18fc..df265c3 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -25,3 +25,4 @@ org.apache.nifi.processors.aws.kinesis.firehose.PutKinesisFirehose
 org.apache.nifi.processors.aws.dynamodb.GetDynamoDB
 org.apache.nifi.processors.aws.dynamodb.PutDynamoDB
 org.apache.nifi.processors.aws.dynamodb.DeleteDynamoDB
+org.apache.nifi.processors.aws.kinesis.stream.PutKinesisStream

http://git-wip-us.apache.org/repos/asf/nifi/blob/af271793/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/kinesis/stream/ITPutKinesisStream.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/kinesis/stream/ITPutKinesisStream.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/kinesis/stream/ITPutKinesisStream.java
new file mode 100644
index 0000000..b195423
--- /dev/null
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/kinesis/stream/ITPutKinesisStream.java
@@ -0,0 +1,430 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.kinesis.stream;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.nifi.processors.aws.kinesis.stream.PutKinesisStream;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * This test contains both unit and integration test (integration tests are ignored by default).
+ * Running integration tests may result in failures due to provisioned capacity of Kinesis stream based on number of shards.
+ * The following integration tests run successfully with 10 shards.  If increasing shards is not a possiblity, please reduce the size and
+ * number of messages in the integration tests based AWS Kinesis provisioning pages calculations.
+ */
+public class ITPutKinesisStream {
+
+    private TestRunner runner;
+    protected final static String CREDENTIALS_FILE = System.getProperty("user.home") + "/aws-credentials.properties";
+
+    @Before
+    public void setUp() throws Exception {
+        runner = TestRunners.newTestRunner(PutKinesisStream.class);
+        runner.setProperty(PutKinesisStream.KINESIS_STREAM_NAME, "kstream");
+        runner.setProperty(PutKinesisStream.CREDENTIALS_FILE, CREDENTIALS_FILE);
+        runner.assertValid();
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        runner = null;
+    }
+
+    /**
+     * Comment out ignore for integration tests (requires creds files)
+     */
+    @Test
+    public void testIntegrationSuccess() throws Exception {
+        runner.setProperty(PutKinesisStream.CREDENTIALS_FILE, CREDENTIALS_FILE);
+        runner.assertValid();
+
+        runner.enqueue("test".getBytes());
+        runner.run(1);
+
+        runner.assertAllFlowFilesTransferred(PutKinesisStream.REL_SUCCESS, 1);
+
+        final List<MockFlowFile> ffs = runner.getFlowFilesForRelationship(PutKinesisStream.REL_SUCCESS);
+        final MockFlowFile out = ffs.iterator().next();
+
+        out.assertContentEquals("test".getBytes());
+    }
+
+    @Test
+    public void testIntegrationWithFixedPartitionSuccess() throws Exception {
+        runner.setProperty(PutKinesisStream.KINESIS_PARTITION_KEY, "pfixed");
+        runner.assertValid();
+
+        runner.enqueue("test".getBytes());
+        runner.run(1);
+
+        runner.assertAllFlowFilesTransferred(PutKinesisStream.REL_SUCCESS, 1);
+
+        final List<MockFlowFile> ffs = runner.getFlowFilesForRelationship(PutKinesisStream.REL_SUCCESS);
+        final MockFlowFile out = ffs.iterator().next();
+
+        out.assertContentEquals("test".getBytes());
+    }
+
+    @Test
+    public void testIntegrationWithDynamicPartitionSuccess() throws Exception {
+        runner.setProperty(PutKinesisStream.KINESIS_PARTITION_KEY, "${parition}");
+        runner.assertValid();
+        Map<String,String> properties = new HashMap<>();
+        properties.put("partition", "px");
+        runner.enqueue("test".getBytes(), properties);
+        runner.run(1);
+
+        runner.assertAllFlowFilesTransferred(PutKinesisStream.REL_SUCCESS, 1);
+
+        final List<MockFlowFile> ffs = runner.getFlowFilesForRelationship(PutKinesisStream.REL_SUCCESS);
+        final MockFlowFile out = ffs.iterator().next();
+
+        out.assertContentEquals("test".getBytes());
+    }
+
+    /**
+     * Comment out ignore for integration tests (requires creds files)
+     */
+    @Test
+    public void testIntegrationFailedBadStreamName() throws Exception {
+        runner.setProperty(PutKinesisStream.KINESIS_STREAM_NAME, "bad-kstream");
+        runner.assertValid();
+
+        runner.enqueue("test".getBytes());
+        runner.run(1);
+
+        runner.assertAllFlowFilesTransferred(PutKinesisStream.REL_FAILURE, 1);
+
+    }
+
+    @Test
+    public void testOneMessageWithMaxBufferSizeGreaterThan1MBOneSuccess() {
+        runner.setProperty(PutKinesisStream.BATCH_SIZE, "2");
+        runner.setProperty(PutKinesisStream.MAX_MESSAGE_BUFFER_SIZE_MB, "1 MB");
+        runner.assertValid();
+        byte [] bytes = new byte[(PutKinesisStream.MAX_MESSAGE_SIZE)];
+        for (int i = 0; i < bytes.length; i++) {
+            bytes[i] = 'a';
+        }
+        runner.enqueue(bytes);
+        runner.run(1);
+
+        runner.assertAllFlowFilesTransferred(PutKinesisStream.REL_SUCCESS, 1);
+        List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(PutKinesisStream.REL_SUCCESS);
+        assertEquals(1,flowFiles.size());
+    }
+
+    @Test
+    public void testOneMessageWithMaxBufferSizeGreaterThan1MBOneWithParameterPartitionSuccess() {
+        runner.setProperty(PutKinesisStream.BATCH_SIZE, "2");
+        runner.setProperty(PutKinesisStream.MAX_MESSAGE_BUFFER_SIZE_MB, "1 MB");
+        runner.setProperty(PutKinesisStream.KINESIS_PARTITION_KEY, "${partitionKey}");
+        runner.assertValid();
+        byte [] bytes = new byte[(PutKinesisStream.MAX_MESSAGE_SIZE)];
+        for (int i = 0; i < bytes.length; i++) {
+            bytes[i] = 'a';
+        }
+        Map<String,String> props = new HashMap<>();
+        props.put("partitionKey", "p1");
+        runner.enqueue(bytes,props);
+        runner.run(1);
+
+        runner.assertAllFlowFilesTransferred(PutKinesisStream.REL_SUCCESS, 1);
+        List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(PutKinesisStream.REL_SUCCESS);
+        assertEquals(1,flowFiles.size());
+    }
+
+    @Test
+    public void testTwoMessageBatch5WithMaxBufferSize1MBRunOnceTwoMessageSent() {
+        runner.setProperty(PutKinesisStream.BATCH_SIZE, "5");
+        runner.setProperty(PutKinesisStream.MAX_MESSAGE_BUFFER_SIZE_MB, "2 MB");
+        runner.assertValid();
+        byte [] bytes = new byte[(PutKinesisStream.MAX_MESSAGE_SIZE)];
+        for (int i = 0; i < bytes.length; i++) {
+            bytes[i] = 'a';
+        }
+        runner.enqueue(bytes);
+        runner.enqueue(bytes.clone());
+        runner.run(1);
+
+        runner.assertAllFlowFilesTransferred(PutKinesisStream.REL_SUCCESS, 2);
+        List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(PutKinesisStream.REL_SUCCESS);
+        assertEquals(2,flowFiles.size());
+        for (MockFlowFile flowFile : flowFiles) {
+            flowFile.assertAttributeExists(PutKinesisStream.AWS_KINESIS_SEQUENCE_NUMBER);
+            flowFile.assertAttributeExists(PutKinesisStream.AWS_KINESIS_SHARD_ID);
+        }
+    }
+
+    @Test
+    public void testThreeMessageWithBatch10MaxBufferSize1MBTRunOnceTwoMessageSent() {
+        runner.setProperty(PutKinesisStream.BATCH_SIZE, "10");
+        runner.setProperty(PutKinesisStream.MAX_MESSAGE_BUFFER_SIZE_MB, "1 MB");
+        runner.assertValid();
+        byte [] bytes = new byte[(PutKinesisStream.MAX_MESSAGE_SIZE)];
+        for (int i = 0; i < bytes.length; i++) {
+            bytes[i] = 'a';
+        }
+        runner.enqueue(bytes);
+        runner.enqueue(bytes.clone());
+        runner.enqueue(bytes.clone());
+        runner.run(1);
+
+        runner.assertAllFlowFilesTransferred(PutKinesisStream.REL_SUCCESS, 2);
+        List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(PutKinesisStream.REL_SUCCESS);
+        assertEquals(2,flowFiles.size());
+        for (MockFlowFile flowFile : flowFiles) {
+            flowFile.assertAttributeExists(PutKinesisStream.AWS_KINESIS_SEQUENCE_NUMBER);
+            flowFile.assertAttributeExists(PutKinesisStream.AWS_KINESIS_SHARD_ID);
+        }
+    }
+
+    @Test
+    public void testTwoMessageWithBatch10MaxBufferSize1MBTRunOnceTwoMessageSent() {
+        runner.setProperty(PutKinesisStream.BATCH_SIZE, "10");
+        runner.setProperty(PutKinesisStream.MAX_MESSAGE_BUFFER_SIZE_MB, "1 MB");
+        runner.assertValid();
+        byte [] bytes = new byte[(PutKinesisStream.MAX_MESSAGE_SIZE)];
+        for (int i = 0; i < bytes.length; i++) {
+            bytes[i] = 'a';
+        }
+        runner.enqueue(bytes);
+        runner.enqueue(bytes.clone());
+        runner.run(1);
+
+        runner.assertAllFlowFilesTransferred(PutKinesisStream.REL_SUCCESS, 2);
+        List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(PutKinesisStream.REL_SUCCESS);
+        assertEquals(2,flowFiles.size());
+        for (MockFlowFile flowFile : flowFiles) {
+            flowFile.assertAttributeExists(PutKinesisStream.AWS_KINESIS_SEQUENCE_NUMBER);
+            flowFile.assertAttributeExists(PutKinesisStream.AWS_KINESIS_SHARD_ID);
+        }
+    }
+
+    @Test
+    public void testThreeMessageWithBatch2MaxBufferSize1MBRunTwiceThreeMessageSent() {
+        runner.setProperty(PutKinesisStream.BATCH_SIZE, "2");
+        runner.setProperty(PutKinesisStream.MAX_MESSAGE_BUFFER_SIZE_MB, "1 MB");
+        runner.assertValid();
+        byte [] bytes = new byte[(PutKinesisStream.MAX_MESSAGE_SIZE)];
+        for (int i = 0; i < bytes.length; i++) {
+            bytes[i] = 'a';
+        }
+        runner.enqueue(bytes);
+        runner.enqueue(bytes.clone());
+        runner.enqueue(bytes.clone());
+        runner.run(2, true, true);
+
+        runner.assertAllFlowFilesTransferred(PutKinesisStream.REL_SUCCESS, 3);
+        List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(PutKinesisStream.REL_SUCCESS);
+        assertEquals(3,flowFiles.size());
+        for (MockFlowFile flowFile : flowFiles) {
+            flowFile.assertAttributeExists(PutKinesisStream.AWS_KINESIS_SEQUENCE_NUMBER);
+            flowFile.assertAttributeExists(PutKinesisStream.AWS_KINESIS_SHARD_ID);
+        }
+    }
+
+    @Test
+    public void testThreeMessageHello2MBThereWithBatch10MaxBufferSize1MBRunOnceTwoMessageSuccessOneFailed() {
+        runner.setProperty(PutKinesisStream.BATCH_SIZE, "10");
+        runner.setProperty(PutKinesisStream.MAX_MESSAGE_BUFFER_SIZE_MB, "1 MB");
+        runner.assertValid();
+        byte [] bytes = new byte[(PutKinesisStream.MAX_MESSAGE_SIZE * 2)];
+        for (int i = 0; i < bytes.length; i++) {
+            bytes[i] = 'a';
+        }
+        runner.enqueue("hello".getBytes());
+        runner.enqueue(bytes);
+        runner.enqueue("there".getBytes());
+        runner.run(1, true, true);
+
+        List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(PutKinesisStream.REL_SUCCESS);
+        assertEquals(2,flowFiles.size());
+        for (MockFlowFile flowFile : flowFiles) {
+            flowFile.assertAttributeExists(PutKinesisStream.AWS_KINESIS_SEQUENCE_NUMBER);
+            flowFile.assertAttributeExists(PutKinesisStream.AWS_KINESIS_SHARD_ID);
+        }
+
+        List<MockFlowFile> flowFilesFailed = runner.getFlowFilesForRelationship(PutKinesisStream.REL_FAILURE);
+        assertEquals(1,flowFilesFailed.size());
+        for (MockFlowFile flowFileFailed : flowFilesFailed) {
+            assertNotNull(flowFileFailed.getAttribute(PutKinesisStream.AWS_KINESIS_ERROR_MESSAGE));
+        }
+    }
+
+    @Test
+    public void testTwoMessageHello2MBWithBatch10MaxBufferSize1MBRunOnceOneSuccessOneFailed() throws Exception {
+        runner.setProperty(PutKinesisStream.BATCH_SIZE, "10");
+        runner.setProperty(PutKinesisStream.MAX_MESSAGE_BUFFER_SIZE_MB, "1 MB");
+        runner.assertValid();
+        byte [] bytes = new byte[(PutKinesisStream.MAX_MESSAGE_SIZE * 2)];
+        for (int i = 0; i < bytes.length; i++) {
+            bytes[i] = 'a';
+        }
+        runner.enqueue("hello".getBytes());
+        runner.enqueue(bytes);
+        runner.run(1, true, true);
+
+        List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(PutKinesisStream.REL_SUCCESS);
+        assertEquals(1,flowFiles.size());
+        for (MockFlowFile flowFile : flowFiles) {
+            flowFile.assertAttributeExists(PutKinesisStream.AWS_KINESIS_SEQUENCE_NUMBER);
+            flowFile.assertAttributeExists(PutKinesisStream.AWS_KINESIS_SHARD_ID);
+            flowFile.assertContentEquals("hello".getBytes());
+        }
+
+        List<MockFlowFile> flowFilesFailed = runner.getFlowFilesForRelationship(PutKinesisStream.REL_FAILURE);
+        assertEquals(1,flowFilesFailed.size());
+        for (MockFlowFile flowFileFailed : flowFilesFailed) {
+            assertNotNull(flowFileFailed.getAttribute(PutKinesisStream.AWS_KINESIS_ERROR_MESSAGE));
+        }
+    }
+
+    @Test
+    public void testTwoMessage2MBHelloWorldWithBatch10MaxBufferSize1MBRunOnceTwoMessageSent() throws Exception {
+        runner.setProperty(PutKinesisStream.BATCH_SIZE, "10");
+        runner.setProperty(PutKinesisStream.MAX_MESSAGE_BUFFER_SIZE_MB, "1 MB");
+        runner.assertValid();
+        byte [] bytes = new byte[(PutKinesisStream.MAX_MESSAGE_SIZE * 2)];
+        for (int i = 0; i < bytes.length; i++) {
+            bytes[i] = 'a';
+        }
+        runner.enqueue(bytes);
+        runner.enqueue("HelloWorld".getBytes());
+        runner.run(1, true, true);
+
+        List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(PutKinesisStream.REL_SUCCESS);
+        assertEquals(1,flowFiles.size());
+        for (MockFlowFile flowFile : flowFiles) {
+            flowFile.assertAttributeExists(PutKinesisStream.AWS_KINESIS_SEQUENCE_NUMBER);
+            flowFile.assertAttributeExists(PutKinesisStream.AWS_KINESIS_SHARD_ID);
+            flowFile.assertContentEquals("HelloWorld".getBytes());
+        }
+
+        List<MockFlowFile> flowFilesFailed = runner.getFlowFilesForRelationship(PutKinesisStream.REL_FAILURE);
+        assertEquals(1,flowFilesFailed.size());
+        for (MockFlowFile flowFileFailed : flowFilesFailed) {
+            assertNotNull(flowFileFailed.getAttribute(PutKinesisStream.AWS_KINESIS_ERROR_MESSAGE));
+        }
+    }
+
+    @Test
+    public void testTwoMessageHelloWorldWithBatch10MaxBufferSize1MBRunOnceTwoMessageSent() throws Exception {
+        runner.setProperty(PutKinesisStream.BATCH_SIZE, "10");
+        runner.setProperty(PutKinesisStream.MAX_MESSAGE_BUFFER_SIZE_MB, "1 MB");
+        runner.assertValid();
+        runner.enqueue("Hello".getBytes());
+        runner.enqueue("World".getBytes());
+        runner.run(1, true, true);
+
+        List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(PutKinesisStream.REL_SUCCESS);
+        assertEquals(2,flowFiles.size());
+        for (MockFlowFile flowFile : flowFiles) {
+            flowFile.assertAttributeExists(PutKinesisStream.AWS_KINESIS_SEQUENCE_NUMBER);
+            flowFile.assertAttributeExists(PutKinesisStream.AWS_KINESIS_SHARD_ID);
+        }
+        flowFiles.get(0).assertContentEquals("Hello".getBytes());
+        flowFiles.get(1).assertContentEquals("World".getBytes());
+
+        List<MockFlowFile> flowFilesFailed = runner.getFlowFilesForRelationship(PutKinesisStream.REL_FAILURE);
+        assertEquals(0,flowFilesFailed.size());
+    }
+
+    @Test
+    public void testThreeMessageWithBatch5MaxBufferSize1MBRunOnceTwoMessageSent() {
+        runner.setProperty(PutKinesisStream.BATCH_SIZE, "5");
+        runner.setProperty(PutKinesisStream.MAX_MESSAGE_BUFFER_SIZE_MB, "1 MB");
+        runner.assertValid();
+        byte [] bytes = new byte[(PutKinesisStream.MAX_MESSAGE_SIZE)];
+        for (int i = 0; i < bytes.length; i++) {
+            bytes[i] = 'a';
+        }
+        runner.enqueue(bytes);
+        runner.enqueue(bytes.clone());
+        runner.enqueue(bytes.clone());
+        runner.run(1, true, true);
+
+        runner.assertAllFlowFilesTransferred(PutKinesisStream.REL_SUCCESS, 2);
+        List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(PutKinesisStream.REL_SUCCESS);
+        assertEquals(2,flowFiles.size());
+        for (MockFlowFile flowFile : flowFiles) {
+            flowFile.assertAttributeExists(PutKinesisStream.AWS_KINESIS_SEQUENCE_NUMBER);
+            flowFile.assertAttributeExists(PutKinesisStream.AWS_KINESIS_SHARD_ID);
+        }
+    }
+
+    @Test
+    public void test5MessageWithBatch10MaxBufferSize10MBRunOnce5MessageSent() {
+        runner.setProperty(PutKinesisStream.BATCH_SIZE, "10");
+        runner.setProperty(PutKinesisStream.MAX_MESSAGE_BUFFER_SIZE_MB, "1 MB");
+        runner.assertValid();
+        byte [] bytes = new byte[10];
+        for (int i = 0; i < bytes.length; i++) {
+            bytes[i] = 'a';
+        }
+        runner.enqueue(bytes);
+        runner.enqueue(bytes.clone());
+        runner.enqueue(bytes.clone());
+        runner.enqueue(bytes);
+        runner.enqueue(bytes.clone());
+        runner.run(1, true, true);
+
+        runner.assertAllFlowFilesTransferred(PutKinesisStream.REL_SUCCESS, 5);
+        List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(PutKinesisStream.REL_SUCCESS);
+        assertEquals(5,flowFiles.size());
+        for (MockFlowFile flowFile : flowFiles) {
+            flowFile.assertAttributeExists(PutKinesisStream.AWS_KINESIS_SEQUENCE_NUMBER);
+            flowFile.assertAttributeExists(PutKinesisStream.AWS_KINESIS_SHARD_ID);
+        }
+    }
+
+    @Test
+    public void test5MessageWithBatch2MaxBufferSize10MBRunOnce2MessageSent() {
+        runner.setProperty(PutKinesisStream.BATCH_SIZE, "2");
+        runner.setProperty(PutKinesisStream.MAX_MESSAGE_BUFFER_SIZE_MB, "1 MB");
+        runner.assertValid();
+        byte [] bytes = new byte[10];
+        for (int i = 0; i < bytes.length; i++) {
+            bytes[i] = 'a';
+        }
+        runner.enqueue(bytes);
+        runner.enqueue(bytes.clone());
+        runner.enqueue(bytes.clone());
+        runner.enqueue(bytes);
+        runner.enqueue(bytes.clone());
+        runner.run(1, true, true);
+
+        runner.assertAllFlowFilesTransferred(PutKinesisStream.REL_SUCCESS, 2);
+        List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(PutKinesisStream.REL_SUCCESS);
+        assertEquals(2,flowFiles.size());
+        for (MockFlowFile flowFile : flowFiles) {
+            flowFile.assertAttributeExists(PutKinesisStream.AWS_KINESIS_SEQUENCE_NUMBER);
+            flowFile.assertAttributeExists(PutKinesisStream.AWS_KINESIS_SHARD_ID);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/af271793/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/kinesis/stream/TestPutKinesisStream.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/kinesis/stream/TestPutKinesisStream.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/kinesis/stream/TestPutKinesisStream.java
new file mode 100644
index 0000000..29ca4f0
--- /dev/null
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/kinesis/stream/TestPutKinesisStream.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.kinesis.stream;
+
+import static org.junit.Assert.assertNotNull;
+
+import java.util.List;
+
+import org.apache.nifi.processors.aws.kinesis.stream.PutKinesisStream;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestPutKinesisStream {
+    private TestRunner runner;
+    protected final static String CREDENTIALS_FILE = System.getProperty("user.home") + "/aws-credentials.properties";
+
+    @Before
+    public void setUp() throws Exception {
+        runner = TestRunners.newTestRunner(PutKinesisStream.class);
+        runner.setProperty(PutKinesisStream.ACCESS_KEY, "abcd");
+        runner.setProperty(PutKinesisStream.SECRET_KEY, "secret key");
+        runner.setProperty(PutKinesisStream.KINESIS_STREAM_NAME, "kstream");
+        runner.assertValid();
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        runner = null;
+    }
+
+    @Test
+    public void testCustomValidateBatchSize1Valid() {
+        runner.setProperty(PutKinesisStream.BATCH_SIZE, "1");
+        runner.assertValid();
+    }
+
+    @Test
+    public void testCustomValidateBatchSize500Valid() {
+        runner.setProperty(PutKinesisStream.BATCH_SIZE, "500");
+        runner.assertValid();
+    }
+    @Test
+    public void testCustomValidateBatchSize501InValid() {
+        runner.setProperty(PutKinesisStream.BATCH_SIZE, "501");
+        runner.assertNotValid();
+    }
+
+    @Test
+    public void testWithSizeGreaterThan1MB() {
+        runner.setProperty(PutKinesisStream.BATCH_SIZE, "1");
+        runner.assertValid();
+        byte [] bytes = new byte[(PutKinesisStream.MAX_MESSAGE_SIZE + 1)];
+        for (int i = 0; i < bytes.length; i++) {
+            bytes[i] = 'a';
+        }
+        runner.enqueue(bytes);
+        runner.run(1);
+
+        runner.assertAllFlowFilesTransferred(PutKinesisStream.REL_FAILURE, 1);
+        List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(PutKinesisStream.REL_FAILURE);
+
+        assertNotNull(flowFiles.get(0).getAttribute(PutKinesisStream.AWS_KINESIS_ERROR_MESSAGE));
+    }
+}