You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by GitBox <gi...@apache.org> on 2022/03/02 22:40:17 UTC

[GitHub] [nifi] turcsanyip commented on a change in pull request #5761: NIFI-9669 Adding PutDynamoDBRecord processor

turcsanyip commented on a change in pull request #5761:
URL: https://github.com/apache/nifi/pull/5761#discussion_r818007725



##########
File path: nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/PutDynamoDBRecord.java
##########
@@ -0,0 +1,366 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.dynamodb;
+
+import com.amazonaws.AmazonClientException;
+import com.amazonaws.AmazonServiceException;
+import com.amazonaws.services.dynamodbv2.document.BatchWriteItemOutcome;
+import com.amazonaws.services.dynamodbv2.document.DynamoDB;
+import com.amazonaws.services.dynamodbv2.document.Item;
+import com.amazonaws.services.dynamodbv2.document.TableWriteItems;
+import com.amazonaws.services.dynamodbv2.model.ProvisionedThroughputExceededException;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
+import org.apache.nifi.annotation.behavior.SystemResource;
+import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
+import org.apache.nifi.annotation.behavior.SystemResourceConsiderations;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.proxy.ProxyConfigurationService;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.SplitRecordSetHandler;
+import org.apache.nifi.serialization.SplitRecordSetHandlerException;
+import org.apache.nifi.serialization.record.Record;
+
+import java.io.InputStream;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+@SeeAlso({DeleteDynamoDB.class, GetDynamoDB.class, PutDynamoDB.class})
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"Amazon", "DynamoDB", "AWS", "Put", "Insert", "Record"})
+@CapabilityDescription(
+        "Inserts items into DynamoDB based on record-oriented data. " +
+        "The record fields are mapped into DynamoDB item fields, including partition and sort keys if set. " +
+        "Depending on the number of records the processor might execute the insert in multiple chunks in order to overcome DynamoDB's limitation on batch writing. " +
+        "This might result partially processed FlowFiles in which case the FlowFile will be transferred to the \"unprocessed\" relationship " +
+        "with the necessary attribute to retry later without duplicating the already executed inserts."
+)
+@WritesAttributes({
+        @WritesAttribute(attribute = PutDynamoDBRecord.DYNAMODB_CHUNKS_PROCESSED_ATTRIBUTE, description = "Number of chunks successfully inserted into DynamoDB. If not set, it is considered as 0"),
+        @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_KEY_ERROR_UNPROCESSED, description = "DynamoDB unprocessed keys"),
+        @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_RANGE_KEY_VALUE_ERROR, description = "DynamoDB range key error"),
+        @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_KEY_ERROR_NOT_FOUND, description = "DynamoDB key not found"),
+        @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_EXCEPTION_MESSAGE, description = "DynamoDB exception message"),
+        @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_CODE, description = "DynamoDB error code"),
+        @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_MESSAGE, description = "DynamoDB error message"),
+        @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_TYPE, description = "DynamoDB error type"),
+        @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_SERVICE, description = "DynamoDB error service"),
+        @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_RETRYABLE, description = "DynamoDB error is retryable"),
+        @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_REQUEST_ID, description = "DynamoDB error request id"),
+        @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_STATUS_CODE, description = "DynamoDB error status code"),
+        @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ITEM_IO_ERROR, description = "IO exception message on creating item")
+})
+@ReadsAttribute(attribute = PutDynamoDBRecord.DYNAMODB_CHUNKS_PROCESSED_ATTRIBUTE, description = "Number of chunks successfully inserted into DynamoDB. If not set, it is considered as 0")
+@SystemResourceConsiderations({
+        @SystemResourceConsideration(resource = SystemResource.MEMORY),
+        @SystemResourceConsideration(resource = SystemResource.NETWORK)
+})
+public class PutDynamoDBRecord extends AbstractDynamoDBProcessor {
+
+    /**
+     * Due to DynamoDB's hardcoded limitation on the number of items in one batch, the processor writes them in chunks.
+     * Every chunk contains a number of items according to the limitations.
+     */
+    private static final int MAXIMUM_CHUNK_SIZE = 25;
+
+    static final String DYNAMODB_CHUNKS_PROCESSED_ATTRIBUTE = "dynamodb.chunks.processed";
+
+    static final AllowableValue PARTITION_BY_FIELD = new AllowableValue("ByField", "Partition By Field",
+            "Uses the value of the Record field identified by the \"Partition Field\" property as partition key value.");
+    static final AllowableValue PARTITION_BY_ATTRIBUTE = new AllowableValue("ByAttribute", "Partition By Attribute",
+            "Uses an incoming FlowFile attribute identified by \"Partition Attribute\" as the value of the partition key. " +
+            "The incoming Records must not contain field with the same name defined by the \"Partition Key Field\".");
+    static final AllowableValue PARTITION_GENERATED = new AllowableValue("Generated", "Generated UUID",
+            "Uses a generated UUID as value for the partition key. The incoming Records must not contain field with the same name defined by the \"Partition Key Field\".");

Review comment:
       It should be mentioned here or in the Additional Details that this strategy only works when the partition key is of type `String`.

##########
File path: nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/PutDynamoDBRecord.java
##########
@@ -0,0 +1,366 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.dynamodb;
+
+import com.amazonaws.AmazonClientException;
+import com.amazonaws.AmazonServiceException;
+import com.amazonaws.services.dynamodbv2.document.BatchWriteItemOutcome;
+import com.amazonaws.services.dynamodbv2.document.DynamoDB;
+import com.amazonaws.services.dynamodbv2.document.Item;
+import com.amazonaws.services.dynamodbv2.document.TableWriteItems;
+import com.amazonaws.services.dynamodbv2.model.ProvisionedThroughputExceededException;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
+import org.apache.nifi.annotation.behavior.SystemResource;
+import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
+import org.apache.nifi.annotation.behavior.SystemResourceConsiderations;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.proxy.ProxyConfigurationService;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.SplitRecordSetHandler;
+import org.apache.nifi.serialization.SplitRecordSetHandlerException;
+import org.apache.nifi.serialization.record.Record;
+
+import java.io.InputStream;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+@SeeAlso({DeleteDynamoDB.class, GetDynamoDB.class, PutDynamoDB.class})
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"Amazon", "DynamoDB", "AWS", "Put", "Insert", "Record"})
+@CapabilityDescription(
+        "Inserts items into DynamoDB based on record-oriented data. " +
+        "The record fields are mapped into DynamoDB item fields, including partition and sort keys if set. " +
+        "Depending on the number of records the processor might execute the insert in multiple chunks in order to overcome DynamoDB's limitation on batch writing. " +
+        "This might result partially processed FlowFiles in which case the FlowFile will be transferred to the \"unprocessed\" relationship " +
+        "with the necessary attribute to retry later without duplicating the already executed inserts."
+)
+@WritesAttributes({
+        @WritesAttribute(attribute = PutDynamoDBRecord.DYNAMODB_CHUNKS_PROCESSED_ATTRIBUTE, description = "Number of chunks successfully inserted into DynamoDB. If not set, it is considered as 0"),
+        @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_KEY_ERROR_UNPROCESSED, description = "DynamoDB unprocessed keys"),
+        @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_RANGE_KEY_VALUE_ERROR, description = "DynamoDB range key error"),
+        @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_KEY_ERROR_NOT_FOUND, description = "DynamoDB key not found"),
+        @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_EXCEPTION_MESSAGE, description = "DynamoDB exception message"),
+        @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_CODE, description = "DynamoDB error code"),
+        @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_MESSAGE, description = "DynamoDB error message"),
+        @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_TYPE, description = "DynamoDB error type"),
+        @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_SERVICE, description = "DynamoDB error service"),
+        @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_RETRYABLE, description = "DynamoDB error is retryable"),
+        @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_REQUEST_ID, description = "DynamoDB error request id"),
+        @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_STATUS_CODE, description = "DynamoDB error status code"),
+        @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ITEM_IO_ERROR, description = "IO exception message on creating item")
+})
+@ReadsAttribute(attribute = PutDynamoDBRecord.DYNAMODB_CHUNKS_PROCESSED_ATTRIBUTE, description = "Number of chunks successfully inserted into DynamoDB. If not set, it is considered as 0")
+@SystemResourceConsiderations({
+        @SystemResourceConsideration(resource = SystemResource.MEMORY),
+        @SystemResourceConsideration(resource = SystemResource.NETWORK)
+})
+public class PutDynamoDBRecord extends AbstractDynamoDBProcessor {
+
+    /**
+     * Due to DynamoDB's hardcoded limitation on the number of items in one batch, the processor writes them in chunks.
+     * Every chunk contains a number of items according to the limitations.
+     */
+    private static final int MAXIMUM_CHUNK_SIZE = 25;
+
+    static final String DYNAMODB_CHUNKS_PROCESSED_ATTRIBUTE = "dynamodb.chunks.processed";
+
+    static final AllowableValue PARTITION_BY_FIELD = new AllowableValue("ByField", "Partition By Field",
+            "Uses the value of the Record field identified by the \"Partition Field\" property as partition key value.");
+    static final AllowableValue PARTITION_BY_ATTRIBUTE = new AllowableValue("ByAttribute", "Partition By Attribute",
+            "Uses an incoming FlowFile attribute identified by \"Partition Attribute\" as the value of the partition key. " +
+            "The incoming Records must not contain field with the same name defined by the \"Partition Key Field\".");
+    static final AllowableValue PARTITION_GENERATED = new AllowableValue("Generated", "Generated UUID",
+            "Uses a generated UUID as value for the partition key. The incoming Records must not contain field with the same name defined by the \"Partition Key Field\".");
+
+    static final AllowableValue SORT_NONE = new AllowableValue("None", "None",
+            "The processor will not assign sort key to the inserted Items.");
+    static final AllowableValue SORT_BY_FIELD = new AllowableValue("ByField", "Sort By Field",
+            "With this strategy, the processor will use the value of the field identified by \"Sort Key Field\" as sort key value.");
+    static final AllowableValue SORT_BY_SEQUENCE = new AllowableValue("BySequence", "Generate Sequence",
+            "The processor will assign a number for every item based on the original record's position in the incoming FlowFile. This will be used as sort key value.");
+
+    static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
+            .name("record-reader")
+            .displayName("Record Reader")
+            .description("Specifies the Controller Service to use for parsing incoming data and determining the data's schema.")
+            .identifiesControllerService(RecordReaderFactory.class)
+            .required(true)
+            .build();
+
+    static final PropertyDescriptor PARTITION_KEY_STRATEGY = new PropertyDescriptor.Builder()
+            .name("partition-key-strategy")
+            .displayName("Partition Key Strategy")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .allowableValues(PARTITION_BY_FIELD, PARTITION_BY_ATTRIBUTE, PARTITION_GENERATED)
+            .defaultValue(PARTITION_BY_FIELD.getValue())
+            .description("Defines the strategy the processor uses to assign partition key value to the inserted Items.")
+            .build();
+
+    static final PropertyDescriptor PARTITION_KEY_FIELD = new PropertyDescriptor.Builder()
+            .name("partition-key-field")
+            .displayName("Partition Key Field")
+            .required(true)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .description(
+                    "Defines the name of the partition key field in the DynamoDB table. Partition key is also known as hash key. " +
+                    "Depending on the \"Partition Key Strategy\" the field value might come from the incoming Record or a generated one.")
+            .build();
+
+    static final PropertyDescriptor PARTITION_KEY_ATTRIBUTE = new PropertyDescriptor.Builder()
+            .name("partition-key-attribute")
+            .displayName("Partition Key Attribute")
+            .required(true)
+            .dependsOn(PARTITION_KEY_STRATEGY, PARTITION_BY_ATTRIBUTE)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .description("Specifies the FlowFile attribute that will be used as the value of the partition key when using \"Partition by attribute\" partition key strategy.")
+            .build();
+
+    static final PropertyDescriptor SORT_KEY_STRATEGY = new PropertyDescriptor.Builder()
+            .name("sort-key-strategy")
+            .displayName("Sort Key Strategy")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .allowableValues(SORT_NONE, SORT_BY_FIELD, SORT_BY_SEQUENCE)
+            .defaultValue(SORT_NONE.getValue())
+            .description("Defines the strategy the processor uses to assign sort key to the inserted Items.")
+            .build();
+
+    static final PropertyDescriptor SORT_KEY_FIELD = new PropertyDescriptor.Builder()
+            .name("sort-key-field")
+            .displayName("Sort Key Field")
+            .required(true)
+            .dependsOn(SORT_KEY_STRATEGY, SORT_BY_FIELD, SORT_BY_SEQUENCE)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .description("Defines the name of the sort key field in the DynamoDB table. Sort key is also known as range key.")
+            .build();
+
+    private static final List<PropertyDescriptor> PROPERTIES = Arrays.asList(
+            RECORD_READER,
+            new PropertyDescriptor.Builder().fromPropertyDescriptor(AWS_CREDENTIALS_PROVIDER_SERVICE).required(true).build(),
+            REGION,
+            TABLE,
+            PARTITION_KEY_STRATEGY,
+            PARTITION_KEY_FIELD,
+            PARTITION_KEY_ATTRIBUTE,
+            SORT_KEY_STRATEGY,
+            SORT_KEY_FIELD,
+            TIMEOUT,
+            ProxyConfigurationService.PROXY_CONFIGURATION_SERVICE,
+            SSL_CONTEXT_SERVICE
+    );
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTIES;
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+        final FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        final int alreadyProcessedChunks = flowFile.getAttribute(DYNAMODB_CHUNKS_PROCESSED_ATTRIBUTE) != null ? Integer.valueOf(flowFile.getAttribute(DYNAMODB_CHUNKS_PROCESSED_ATTRIBUTE)) : 0;
+        final RecordReaderFactory recordParserFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+        final SplitRecordSetHandler handler = new DynamoDbSplitRecordSetHandler(MAXIMUM_CHUNK_SIZE, getDynamoDB(), context, flowFile.getAttributes(), getLogger());
+        final SplitRecordSetHandler.RecordHandlerResult result;
+
+        try (
+            final InputStream in = session.read(flowFile);
+            final RecordReader reader = recordParserFactory.createRecordReader(flowFile, in, getLogger())) {
+            result = handler.handle(reader.createRecordSet(), alreadyProcessedChunks);
+        } catch (final Exception e) {
+            getLogger().error("Error while reading records: " + e.getMessage(), e);
+            session.transfer(flowFile, REL_FAILURE);
+            return;
+        }
+
+        final Map<String, String> attributes = new HashMap<>(flowFile.getAttributes());
+        attributes.put(DYNAMODB_CHUNKS_PROCESSED_ATTRIBUTE, String.valueOf(result.getSuccessfulChunks()));
+        final FlowFile outgoingFlowFile = session.putAllAttributes(flowFile, attributes);
+
+        if (result.isSuccess()) {
+            session.transfer(outgoingFlowFile, REL_SUCCESS);
+        } else if (result.getThrowable().getCause() != null && result.getThrowable().getCause() instanceof ProvisionedThroughputExceededException) {

Review comment:
       `result.getThrowable().getCause()` is used a couple of times. I'd consider to use a local variable. The code would be more readable.

##########
File path: nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/docs/org.apache.nifi.processors.aws.dynamodb.PutDynamoDBRecord/additionalDetails.html
##########
@@ -0,0 +1,258 @@
+<!DOCTYPE html>
+<html lang="en">
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+      http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<head>
+    <meta charset="utf-8" />
+    <title>PutDynamoDBRecord</title>
+    <link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css" />
+</head>
+<body>
+    <h2>Description</h2>
+
+    <p>
+        <i>PutDynamoDBRecord</i> intends to provide the capability to insert multiple Items into a DynamoDB table from a record-oriented FlowFile.
+        Compared to the <i>PutDynamoDB</i>, this processor is capable to process data based other than JSON format too and prepared to add multiple fields for a given Item.
+        Also, <i>PutDynamoDBRecord</i> is designed to insert bigger batches of data into the database.
+    </p>
+
+    <h2>Data types</h2>
+
+    <p>
+        The list data types supported by DynamoDB does not fully overlaps with the capabilities of the Record data structure.
+        Some conversions and simplifications are necessary during inserting the data. These are:
+    </p>
+
+    <ul>
+        <li>Numeric values are stored using a floating-point data structure within Items. In some cases this representation might cause issues with the accuracy.</li>
+        <li>Char is not a supported type within DynamoDB, these fields are converted into String values.</li>
+        <li>Enum types are stored as String fields, using the name of the given enum.</li>
+        <li>DynamoDB stores time and date related information as Strings.</li>
+        <li>Internal record structures are converted into maps.</li>
+        <li>Choice is not a supported data type, regardless of the actual wrapped data type, values enveloped in Choice are handled as Strings.</li>
+        <li>Unknown data types are handles as stings.</li>
+    </ul>
+
+    <h2>Limitations</h2>
+
+    <p>
+        Working with DynamoDB when batch inserting comes with two inherit limitations. First, the number of inserted Items is limited to 25 in any case.
+        In order to overcome this, during one execution, depending on the number or records in the incoming FlowFile, <i>PutDynamoDBRecord</i> might attempt multiple
+        insert calls towards the database server. Using this approach, the flow does not have to work with this limitation in most cases.
+    </p>
+
+    <p>
+        Having multiple external actions comes with the risk of having an unforeseen result at one of the steps.
+        For example when the incoming FlowFile is consists of 70 records, it will be split into 3 chunks, with a single insert operation for every chunk.
+        The first two chunks contains 25 Items to insert per chunk, and the third contains the remaining 20. In some cases it might occur that the first two insert operation succeeds but the third one fails.
+        In these cases we consider the FlowFile "partially processed" and we will transfer it to the "failure" or "unprocessed" Relationship according to the nature of the issue.
+        In order to keep the information about the successfully processed chunks the processor assigns the <i>"dynamodb.chunks.processed"</i> attribute to the FlowFile, which has the number of successfully processed chunks as value.
+    </p>
+
+    <p>
+        The most common reason for this behaviour comes from the other limitation the inserts have with DynamoDB: the database has a build in supervision over the amount of inserted data.
+        When a client reaches the "throughput limit", the server refuses to process the insert request until a certain amount of time. More information on this might be find <a href="https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/HowItWorks.ReadWriteCapacityMode.html">here</a>.
+        From the perspective of the <i>PutDynamoDBRecord</i> we consider these cases as temporary issues and the FlowFile will be transferred to the "unprocessed" Relationship after which the processor will yield in order to avoid further throughput issues.
+        (Other kinds of failures will result transfer to the "failure" Relationship)
+    </p>
+
+    <h2>Retry</h2>
+
+    <p>
+        It is suggested to loop back the "unprocessed" Relationship to the <i>PutDynamoDBRecord</i> in some way. FlowFiles transferred to that relationship considered as healthy ones might be successfully processed in a later point.
+        It is possible that the FlowFile contains such a high number of records, what needs more than two attempts to fully insert.
+        The attribute "dynamodb.chunks.processed" is "rolled" through the attempts, which means, after each trigger it will contain the sum number of inserted chunks making it possible for the later attempts to continue from the right point without duplicated inserts.
+    </p>
+
+    <h2>Partition and sort keys</h2>
+
+    <p>
+        The processor supports multiple strategies for assigning partition key and sort key to the inserted Items. These are:
+    </p>
+
+    <h3>Partition Key Strategies</h3>
+
+    <h4>Partition By Field</h4>
+
+    <p>
+        The processors assigns one of the record field as partition key. The name of the record field is specified by the "Partition Key Field" attribute and the value will be the value of the record field with the same name.

Review comment:
       Typos:
   
   - `one of the record fields`
   - `"Partition Key Field" property`

##########
File path: nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/docs/org.apache.nifi.processors.aws.dynamodb.PutDynamoDBRecord/additionalDetails.html
##########
@@ -0,0 +1,258 @@
+<!DOCTYPE html>
+<html lang="en">
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+      http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<head>
+    <meta charset="utf-8" />
+    <title>PutDynamoDBRecord</title>
+    <link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css" />
+</head>
+<body>
+    <h2>Description</h2>
+
+    <p>
+        <i>PutDynamoDBRecord</i> intends to provide the capability to insert multiple Items into a DynamoDB table from a record-oriented FlowFile.
+        Compared to the <i>PutDynamoDB</i>, this processor is capable to process data based other than JSON format too and prepared to add multiple fields for a given Item.
+        Also, <i>PutDynamoDBRecord</i> is designed to insert bigger batches of data into the database.
+    </p>
+
+    <h2>Data types</h2>
+
+    <p>
+        The list data types supported by DynamoDB does not fully overlaps with the capabilities of the Record data structure.
+        Some conversions and simplifications are necessary during inserting the data. These are:
+    </p>
+
+    <ul>
+        <li>Numeric values are stored using a floating-point data structure within Items. In some cases this representation might cause issues with the accuracy.</li>
+        <li>Char is not a supported type within DynamoDB, these fields are converted into String values.</li>
+        <li>Enum types are stored as String fields, using the name of the given enum.</li>
+        <li>DynamoDB stores time and date related information as Strings.</li>
+        <li>Internal record structures are converted into maps.</li>
+        <li>Choice is not a supported data type, regardless of the actual wrapped data type, values enveloped in Choice are handled as Strings.</li>
+        <li>Unknown data types are handles as stings.</li>
+    </ul>
+
+    <h2>Limitations</h2>
+
+    <p>
+        Working with DynamoDB when batch inserting comes with two inherit limitations. First, the number of inserted Items is limited to 25 in any case.
+        In order to overcome this, during one execution, depending on the number or records in the incoming FlowFile, <i>PutDynamoDBRecord</i> might attempt multiple
+        insert calls towards the database server. Using this approach, the flow does not have to work with this limitation in most cases.
+    </p>
+
+    <p>
+        Having multiple external actions comes with the risk of having an unforeseen result at one of the steps.
+        For example when the incoming FlowFile is consists of 70 records, it will be split into 3 chunks, with a single insert operation for every chunk.
+        The first two chunks contains 25 Items to insert per chunk, and the third contains the remaining 20. In some cases it might occur that the first two insert operation succeeds but the third one fails.
+        In these cases we consider the FlowFile "partially processed" and we will transfer it to the "failure" or "unprocessed" Relationship according to the nature of the issue.
+        In order to keep the information about the successfully processed chunks the processor assigns the <i>"dynamodb.chunks.processed"</i> attribute to the FlowFile, which has the number of successfully processed chunks as value.
+    </p>
+
+    <p>
+        The most common reason for this behaviour comes from the other limitation the inserts have with DynamoDB: the database has a build in supervision over the amount of inserted data.
+        When a client reaches the "throughput limit", the server refuses to process the insert request until a certain amount of time. More information on this might be find <a href="https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/HowItWorks.ReadWriteCapacityMode.html">here</a>.
+        From the perspective of the <i>PutDynamoDBRecord</i> we consider these cases as temporary issues and the FlowFile will be transferred to the "unprocessed" Relationship after which the processor will yield in order to avoid further throughput issues.
+        (Other kinds of failures will result transfer to the "failure" Relationship)
+    </p>
+
+    <h2>Retry</h2>
+
+    <p>
+        It is suggested to loop back the "unprocessed" Relationship to the <i>PutDynamoDBRecord</i> in some way. FlowFiles transferred to that relationship considered as healthy ones might be successfully processed in a later point.
+        It is possible that the FlowFile contains such a high number of records, what needs more than two attempts to fully insert.
+        The attribute "dynamodb.chunks.processed" is "rolled" through the attempts, which means, after each trigger it will contain the sum number of inserted chunks making it possible for the later attempts to continue from the right point without duplicated inserts.
+    </p>
+
+    <h2>Partition and sort keys</h2>
+
+    <p>
+        The processor supports multiple strategies for assigning partition key and sort key to the inserted Items. These are:
+    </p>
+
+    <h3>Partition Key Strategies</h3>
+
+    <h4>Partition By Field</h4>
+
+    <p>
+        The processors assigns one of the record field as partition key. The name of the record field is specified by the "Partition Key Field" attribute and the value will be the value of the record field with the same name.
+    </p>
+
+    <h4>Partition By Attribute</h4>
+
+    <p>
+        The processor assigns the value of a FlowFile attribute as partition key. With this strategy all the Items within a FlowFile will share the same partition case and it is suggested to use a sort key in order to meet the primary key requirements of the DynamoDB.

Review comment:
       Maybe:
   `it is suggested to use a sort key` => `it is suggested to use for tables also having a sort key`

##########
File path: nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/PutDynamoDBRecord.java
##########
@@ -0,0 +1,366 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.dynamodb;
+
+import com.amazonaws.AmazonClientException;
+import com.amazonaws.AmazonServiceException;
+import com.amazonaws.services.dynamodbv2.document.BatchWriteItemOutcome;
+import com.amazonaws.services.dynamodbv2.document.DynamoDB;
+import com.amazonaws.services.dynamodbv2.document.Item;
+import com.amazonaws.services.dynamodbv2.document.TableWriteItems;
+import com.amazonaws.services.dynamodbv2.model.ProvisionedThroughputExceededException;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
+import org.apache.nifi.annotation.behavior.SystemResource;
+import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
+import org.apache.nifi.annotation.behavior.SystemResourceConsiderations;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.proxy.ProxyConfigurationService;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.SplitRecordSetHandler;
+import org.apache.nifi.serialization.SplitRecordSetHandlerException;
+import org.apache.nifi.serialization.record.Record;
+
+import java.io.InputStream;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+@SeeAlso({DeleteDynamoDB.class, GetDynamoDB.class, PutDynamoDB.class})
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"Amazon", "DynamoDB", "AWS", "Put", "Insert", "Record"})
+@CapabilityDescription(
+        "Inserts items into DynamoDB based on record-oriented data. " +
+        "The record fields are mapped into DynamoDB item fields, including partition and sort keys if set. " +
+        "Depending on the number of records the processor might execute the insert in multiple chunks in order to overcome DynamoDB's limitation on batch writing. " +
+        "This might result partially processed FlowFiles in which case the FlowFile will be transferred to the \"unprocessed\" relationship " +
+        "with the necessary attribute to retry later without duplicating the already executed inserts."
+)
+@WritesAttributes({
+        @WritesAttribute(attribute = PutDynamoDBRecord.DYNAMODB_CHUNKS_PROCESSED_ATTRIBUTE, description = "Number of chunks successfully inserted into DynamoDB. If not set, it is considered as 0"),
+        @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_KEY_ERROR_UNPROCESSED, description = "DynamoDB unprocessed keys"),
+        @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_RANGE_KEY_VALUE_ERROR, description = "DynamoDB range key error"),
+        @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_KEY_ERROR_NOT_FOUND, description = "DynamoDB key not found"),
+        @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_EXCEPTION_MESSAGE, description = "DynamoDB exception message"),
+        @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_CODE, description = "DynamoDB error code"),
+        @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_MESSAGE, description = "DynamoDB error message"),
+        @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_TYPE, description = "DynamoDB error type"),
+        @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_SERVICE, description = "DynamoDB error service"),
+        @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_RETRYABLE, description = "DynamoDB error is retryable"),
+        @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_REQUEST_ID, description = "DynamoDB error request id"),
+        @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_STATUS_CODE, description = "DynamoDB error status code"),
+        @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ITEM_IO_ERROR, description = "IO exception message on creating item")
+})
+@ReadsAttribute(attribute = PutDynamoDBRecord.DYNAMODB_CHUNKS_PROCESSED_ATTRIBUTE, description = "Number of chunks successfully inserted into DynamoDB. If not set, it is considered as 0")
+@SystemResourceConsiderations({
+        @SystemResourceConsideration(resource = SystemResource.MEMORY),
+        @SystemResourceConsideration(resource = SystemResource.NETWORK)
+})
+public class PutDynamoDBRecord extends AbstractDynamoDBProcessor {
+
+    /**
+     * Due to DynamoDB's hardcoded limitation on the number of items in one batch, the processor writes them in chunks.
+     * Every chunk contains a number of items according to the limitations.
+     */
+    private static final int MAXIMUM_CHUNK_SIZE = 25;
+
+    static final String DYNAMODB_CHUNKS_PROCESSED_ATTRIBUTE = "dynamodb.chunks.processed";
+
+    static final AllowableValue PARTITION_BY_FIELD = new AllowableValue("ByField", "Partition By Field",
+            "Uses the value of the Record field identified by the \"Partition Field\" property as partition key value.");
+    static final AllowableValue PARTITION_BY_ATTRIBUTE = new AllowableValue("ByAttribute", "Partition By Attribute",
+            "Uses an incoming FlowFile attribute identified by \"Partition Attribute\" as the value of the partition key. " +
+            "The incoming Records must not contain field with the same name defined by the \"Partition Key Field\".");
+    static final AllowableValue PARTITION_GENERATED = new AllowableValue("Generated", "Generated UUID",
+            "Uses a generated UUID as value for the partition key. The incoming Records must not contain field with the same name defined by the \"Partition Key Field\".");
+
+    static final AllowableValue SORT_NONE = new AllowableValue("None", "None",
+            "The processor will not assign sort key to the inserted Items.");
+    static final AllowableValue SORT_BY_FIELD = new AllowableValue("ByField", "Sort By Field",
+            "With this strategy, the processor will use the value of the field identified by \"Sort Key Field\" as sort key value.");
+    static final AllowableValue SORT_BY_SEQUENCE = new AllowableValue("BySequence", "Generate Sequence",
+            "The processor will assign a number for every item based on the original record's position in the incoming FlowFile. This will be used as sort key value.");
+
+    static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
+            .name("record-reader")
+            .displayName("Record Reader")
+            .description("Specifies the Controller Service to use for parsing incoming data and determining the data's schema.")
+            .identifiesControllerService(RecordReaderFactory.class)
+            .required(true)
+            .build();
+
+    static final PropertyDescriptor PARTITION_KEY_STRATEGY = new PropertyDescriptor.Builder()
+            .name("partition-key-strategy")
+            .displayName("Partition Key Strategy")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .allowableValues(PARTITION_BY_FIELD, PARTITION_BY_ATTRIBUTE, PARTITION_GENERATED)
+            .defaultValue(PARTITION_BY_FIELD.getValue())
+            .description("Defines the strategy the processor uses to assign partition key value to the inserted Items.")
+            .build();
+
+    static final PropertyDescriptor PARTITION_KEY_FIELD = new PropertyDescriptor.Builder()
+            .name("partition-key-field")
+            .displayName("Partition Key Field")
+            .required(true)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .description(
+                    "Defines the name of the partition key field in the DynamoDB table. Partition key is also known as hash key. " +
+                    "Depending on the \"Partition Key Strategy\" the field value might come from the incoming Record or a generated one.")
+            .build();
+
+    static final PropertyDescriptor PARTITION_KEY_ATTRIBUTE = new PropertyDescriptor.Builder()
+            .name("partition-key-attribute")
+            .displayName("Partition Key Attribute")
+            .required(true)
+            .dependsOn(PARTITION_KEY_STRATEGY, PARTITION_BY_ATTRIBUTE)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .description("Specifies the FlowFile attribute that will be used as the value of the partition key when using \"Partition by attribute\" partition key strategy.")
+            .build();
+
+    static final PropertyDescriptor SORT_KEY_STRATEGY = new PropertyDescriptor.Builder()
+            .name("sort-key-strategy")
+            .displayName("Sort Key Strategy")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .allowableValues(SORT_NONE, SORT_BY_FIELD, SORT_BY_SEQUENCE)
+            .defaultValue(SORT_NONE.getValue())
+            .description("Defines the strategy the processor uses to assign sort key to the inserted Items.")
+            .build();
+
+    static final PropertyDescriptor SORT_KEY_FIELD = new PropertyDescriptor.Builder()
+            .name("sort-key-field")
+            .displayName("Sort Key Field")
+            .required(true)
+            .dependsOn(SORT_KEY_STRATEGY, SORT_BY_FIELD, SORT_BY_SEQUENCE)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .description("Defines the name of the sort key field in the DynamoDB table. Sort key is also known as range key.")
+            .build();
+
+    private static final List<PropertyDescriptor> PROPERTIES = Arrays.asList(
+            RECORD_READER,
+            new PropertyDescriptor.Builder().fromPropertyDescriptor(AWS_CREDENTIALS_PROVIDER_SERVICE).required(true).build(),
+            REGION,
+            TABLE,
+            PARTITION_KEY_STRATEGY,
+            PARTITION_KEY_FIELD,
+            PARTITION_KEY_ATTRIBUTE,
+            SORT_KEY_STRATEGY,
+            SORT_KEY_FIELD,
+            TIMEOUT,
+            ProxyConfigurationService.PROXY_CONFIGURATION_SERVICE,
+            SSL_CONTEXT_SERVICE
+    );
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTIES;
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+        final FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        final int alreadyProcessedChunks = flowFile.getAttribute(DYNAMODB_CHUNKS_PROCESSED_ATTRIBUTE) != null ? Integer.valueOf(flowFile.getAttribute(DYNAMODB_CHUNKS_PROCESSED_ATTRIBUTE)) : 0;
+        final RecordReaderFactory recordParserFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+        final SplitRecordSetHandler handler = new DynamoDbSplitRecordSetHandler(MAXIMUM_CHUNK_SIZE, getDynamoDB(), context, flowFile.getAttributes(), getLogger());
+        final SplitRecordSetHandler.RecordHandlerResult result;
+
+        try (
+            final InputStream in = session.read(flowFile);
+            final RecordReader reader = recordParserFactory.createRecordReader(flowFile, in, getLogger())) {
+            result = handler.handle(reader.createRecordSet(), alreadyProcessedChunks);
+        } catch (final Exception e) {
+            getLogger().error("Error while reading records: " + e.getMessage(), e);
+            session.transfer(flowFile, REL_FAILURE);
+            return;
+        }
+
+        final Map<String, String> attributes = new HashMap<>(flowFile.getAttributes());
+        attributes.put(DYNAMODB_CHUNKS_PROCESSED_ATTRIBUTE, String.valueOf(result.getSuccessfulChunks()));
+        final FlowFile outgoingFlowFile = session.putAllAttributes(flowFile, attributes);
+
+        if (result.isSuccess()) {
+            session.transfer(outgoingFlowFile, REL_SUCCESS);
+        } else if (result.getThrowable().getCause() != null && result.getThrowable().getCause() instanceof ProvisionedThroughputExceededException) {
+            /**

Review comment:
       This is not a javadoc comment.

##########
File path: nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/PutDynamoDBRecord.java
##########
@@ -0,0 +1,366 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.dynamodb;
+
+import com.amazonaws.AmazonClientException;
+import com.amazonaws.AmazonServiceException;
+import com.amazonaws.services.dynamodbv2.document.BatchWriteItemOutcome;
+import com.amazonaws.services.dynamodbv2.document.DynamoDB;
+import com.amazonaws.services.dynamodbv2.document.Item;
+import com.amazonaws.services.dynamodbv2.document.TableWriteItems;
+import com.amazonaws.services.dynamodbv2.model.ProvisionedThroughputExceededException;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
+import org.apache.nifi.annotation.behavior.SystemResource;
+import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
+import org.apache.nifi.annotation.behavior.SystemResourceConsiderations;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.proxy.ProxyConfigurationService;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.SplitRecordSetHandler;
+import org.apache.nifi.serialization.SplitRecordSetHandlerException;
+import org.apache.nifi.serialization.record.Record;
+
+import java.io.InputStream;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+@SeeAlso({DeleteDynamoDB.class, GetDynamoDB.class, PutDynamoDB.class})
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"Amazon", "DynamoDB", "AWS", "Put", "Insert", "Record"})
+@CapabilityDescription(
+        "Inserts items into DynamoDB based on record-oriented data. " +
+        "The record fields are mapped into DynamoDB item fields, including partition and sort keys if set. " +
+        "Depending on the number of records the processor might execute the insert in multiple chunks in order to overcome DynamoDB's limitation on batch writing. " +
+        "This might result partially processed FlowFiles in which case the FlowFile will be transferred to the \"unprocessed\" relationship " +
+        "with the necessary attribute to retry later without duplicating the already executed inserts."
+)
+@WritesAttributes({
+        @WritesAttribute(attribute = PutDynamoDBRecord.DYNAMODB_CHUNKS_PROCESSED_ATTRIBUTE, description = "Number of chunks successfully inserted into DynamoDB. If not set, it is considered as 0"),
+        @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_KEY_ERROR_UNPROCESSED, description = "DynamoDB unprocessed keys"),
+        @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_RANGE_KEY_VALUE_ERROR, description = "DynamoDB range key error"),
+        @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_KEY_ERROR_NOT_FOUND, description = "DynamoDB key not found"),
+        @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_EXCEPTION_MESSAGE, description = "DynamoDB exception message"),
+        @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_CODE, description = "DynamoDB error code"),
+        @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_MESSAGE, description = "DynamoDB error message"),
+        @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_TYPE, description = "DynamoDB error type"),
+        @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_SERVICE, description = "DynamoDB error service"),
+        @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_RETRYABLE, description = "DynamoDB error is retryable"),
+        @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_REQUEST_ID, description = "DynamoDB error request id"),
+        @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_STATUS_CODE, description = "DynamoDB error status code"),
+        @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ITEM_IO_ERROR, description = "IO exception message on creating item")
+})
+@ReadsAttribute(attribute = PutDynamoDBRecord.DYNAMODB_CHUNKS_PROCESSED_ATTRIBUTE, description = "Number of chunks successfully inserted into DynamoDB. If not set, it is considered as 0")
+@SystemResourceConsiderations({
+        @SystemResourceConsideration(resource = SystemResource.MEMORY),
+        @SystemResourceConsideration(resource = SystemResource.NETWORK)
+})
+public class PutDynamoDBRecord extends AbstractDynamoDBProcessor {
+
+    /**
+     * Due to DynamoDB's hardcoded limitation on the number of items in one batch, the processor writes them in chunks.
+     * Every chunk contains a number of items according to the limitations.
+     */
+    private static final int MAXIMUM_CHUNK_SIZE = 25;
+
+    static final String DYNAMODB_CHUNKS_PROCESSED_ATTRIBUTE = "dynamodb.chunks.processed";
+
+    static final AllowableValue PARTITION_BY_FIELD = new AllowableValue("ByField", "Partition By Field",
+            "Uses the value of the Record field identified by the \"Partition Field\" property as partition key value.");
+    static final AllowableValue PARTITION_BY_ATTRIBUTE = new AllowableValue("ByAttribute", "Partition By Attribute",
+            "Uses an incoming FlowFile attribute identified by \"Partition Attribute\" as the value of the partition key. " +
+            "The incoming Records must not contain field with the same name defined by the \"Partition Key Field\".");
+    static final AllowableValue PARTITION_GENERATED = new AllowableValue("Generated", "Generated UUID",
+            "Uses a generated UUID as value for the partition key. The incoming Records must not contain field with the same name defined by the \"Partition Key Field\".");
+
+    static final AllowableValue SORT_NONE = new AllowableValue("None", "None",
+            "The processor will not assign sort key to the inserted Items.");
+    static final AllowableValue SORT_BY_FIELD = new AllowableValue("ByField", "Sort By Field",
+            "With this strategy, the processor will use the value of the field identified by \"Sort Key Field\" as sort key value.");
+    static final AllowableValue SORT_BY_SEQUENCE = new AllowableValue("BySequence", "Generate Sequence",
+            "The processor will assign a number for every item based on the original record's position in the incoming FlowFile. This will be used as sort key value.");
+
+    static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
+            .name("record-reader")
+            .displayName("Record Reader")
+            .description("Specifies the Controller Service to use for parsing incoming data and determining the data's schema.")
+            .identifiesControllerService(RecordReaderFactory.class)
+            .required(true)
+            .build();
+
+    static final PropertyDescriptor PARTITION_KEY_STRATEGY = new PropertyDescriptor.Builder()
+            .name("partition-key-strategy")
+            .displayName("Partition Key Strategy")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .allowableValues(PARTITION_BY_FIELD, PARTITION_BY_ATTRIBUTE, PARTITION_GENERATED)
+            .defaultValue(PARTITION_BY_FIELD.getValue())
+            .description("Defines the strategy the processor uses to assign partition key value to the inserted Items.")
+            .build();
+
+    static final PropertyDescriptor PARTITION_KEY_FIELD = new PropertyDescriptor.Builder()
+            .name("partition-key-field")
+            .displayName("Partition Key Field")
+            .required(true)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .description(
+                    "Defines the name of the partition key field in the DynamoDB table. Partition key is also known as hash key. " +
+                    "Depending on the \"Partition Key Strategy\" the field value might come from the incoming Record or a generated one.")
+            .build();
+
+    static final PropertyDescriptor PARTITION_KEY_ATTRIBUTE = new PropertyDescriptor.Builder()
+            .name("partition-key-attribute")
+            .displayName("Partition Key Attribute")
+            .required(true)
+            .dependsOn(PARTITION_KEY_STRATEGY, PARTITION_BY_ATTRIBUTE)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .description("Specifies the FlowFile attribute that will be used as the value of the partition key when using \"Partition by attribute\" partition key strategy.")
+            .build();
+
+    static final PropertyDescriptor SORT_KEY_STRATEGY = new PropertyDescriptor.Builder()
+            .name("sort-key-strategy")
+            .displayName("Sort Key Strategy")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .allowableValues(SORT_NONE, SORT_BY_FIELD, SORT_BY_SEQUENCE)
+            .defaultValue(SORT_NONE.getValue())
+            .description("Defines the strategy the processor uses to assign sort key to the inserted Items.")
+            .build();
+
+    static final PropertyDescriptor SORT_KEY_FIELD = new PropertyDescriptor.Builder()
+            .name("sort-key-field")
+            .displayName("Sort Key Field")
+            .required(true)
+            .dependsOn(SORT_KEY_STRATEGY, SORT_BY_FIELD, SORT_BY_SEQUENCE)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .description("Defines the name of the sort key field in the DynamoDB table. Sort key is also known as range key.")
+            .build();
+
+    private static final List<PropertyDescriptor> PROPERTIES = Arrays.asList(
+            RECORD_READER,
+            new PropertyDescriptor.Builder().fromPropertyDescriptor(AWS_CREDENTIALS_PROVIDER_SERVICE).required(true).build(),
+            REGION,
+            TABLE,
+            PARTITION_KEY_STRATEGY,
+            PARTITION_KEY_FIELD,
+            PARTITION_KEY_ATTRIBUTE,
+            SORT_KEY_STRATEGY,
+            SORT_KEY_FIELD,
+            TIMEOUT,
+            ProxyConfigurationService.PROXY_CONFIGURATION_SERVICE,
+            SSL_CONTEXT_SERVICE
+    );
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTIES;
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+        final FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        final int alreadyProcessedChunks = flowFile.getAttribute(DYNAMODB_CHUNKS_PROCESSED_ATTRIBUTE) != null ? Integer.valueOf(flowFile.getAttribute(DYNAMODB_CHUNKS_PROCESSED_ATTRIBUTE)) : 0;

Review comment:
       IntelliJ suggests `Integer.parseInt()` instead.

##########
File path: nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/PutDynamoDBRecord.java
##########
@@ -0,0 +1,366 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.dynamodb;
+
+import com.amazonaws.AmazonClientException;
+import com.amazonaws.AmazonServiceException;
+import com.amazonaws.services.dynamodbv2.document.BatchWriteItemOutcome;
+import com.amazonaws.services.dynamodbv2.document.DynamoDB;
+import com.amazonaws.services.dynamodbv2.document.Item;
+import com.amazonaws.services.dynamodbv2.document.TableWriteItems;
+import com.amazonaws.services.dynamodbv2.model.ProvisionedThroughputExceededException;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
+import org.apache.nifi.annotation.behavior.SystemResource;
+import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
+import org.apache.nifi.annotation.behavior.SystemResourceConsiderations;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.proxy.ProxyConfigurationService;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.SplitRecordSetHandler;
+import org.apache.nifi.serialization.SplitRecordSetHandlerException;
+import org.apache.nifi.serialization.record.Record;
+
+import java.io.InputStream;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+@SeeAlso({DeleteDynamoDB.class, GetDynamoDB.class, PutDynamoDB.class})
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"Amazon", "DynamoDB", "AWS", "Put", "Insert", "Record"})
+@CapabilityDescription(
+        "Inserts items into DynamoDB based on record-oriented data. " +
+        "The record fields are mapped into DynamoDB item fields, including partition and sort keys if set. " +
+        "Depending on the number of records the processor might execute the insert in multiple chunks in order to overcome DynamoDB's limitation on batch writing. " +
+        "This might result partially processed FlowFiles in which case the FlowFile will be transferred to the \"unprocessed\" relationship " +
+        "with the necessary attribute to retry later without duplicating the already executed inserts."
+)
+@WritesAttributes({
+        @WritesAttribute(attribute = PutDynamoDBRecord.DYNAMODB_CHUNKS_PROCESSED_ATTRIBUTE, description = "Number of chunks successfully inserted into DynamoDB. If not set, it is considered as 0"),
+        @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_KEY_ERROR_UNPROCESSED, description = "DynamoDB unprocessed keys"),
+        @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_RANGE_KEY_VALUE_ERROR, description = "DynamoDB range key error"),
+        @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_KEY_ERROR_NOT_FOUND, description = "DynamoDB key not found"),
+        @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_EXCEPTION_MESSAGE, description = "DynamoDB exception message"),
+        @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_CODE, description = "DynamoDB error code"),
+        @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_MESSAGE, description = "DynamoDB error message"),
+        @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_TYPE, description = "DynamoDB error type"),
+        @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_SERVICE, description = "DynamoDB error service"),
+        @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_RETRYABLE, description = "DynamoDB error is retryable"),
+        @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_REQUEST_ID, description = "DynamoDB error request id"),
+        @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_STATUS_CODE, description = "DynamoDB error status code"),
+        @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ITEM_IO_ERROR, description = "IO exception message on creating item")
+})
+@ReadsAttribute(attribute = PutDynamoDBRecord.DYNAMODB_CHUNKS_PROCESSED_ATTRIBUTE, description = "Number of chunks successfully inserted into DynamoDB. If not set, it is considered as 0")
+@SystemResourceConsiderations({
+        @SystemResourceConsideration(resource = SystemResource.MEMORY),
+        @SystemResourceConsideration(resource = SystemResource.NETWORK)
+})
+public class PutDynamoDBRecord extends AbstractDynamoDBProcessor {
+
+    /**
+     * Due to DynamoDB's hardcoded limitation on the number of items in one batch, the processor writes them in chunks.
+     * Every chunk contains a number of items according to the limitations.
+     */
+    private static final int MAXIMUM_CHUNK_SIZE = 25;
+
+    static final String DYNAMODB_CHUNKS_PROCESSED_ATTRIBUTE = "dynamodb.chunks.processed";
+
+    static final AllowableValue PARTITION_BY_FIELD = new AllowableValue("ByField", "Partition By Field",
+            "Uses the value of the Record field identified by the \"Partition Field\" property as partition key value.");
+    static final AllowableValue PARTITION_BY_ATTRIBUTE = new AllowableValue("ByAttribute", "Partition By Attribute",
+            "Uses an incoming FlowFile attribute identified by \"Partition Attribute\" as the value of the partition key. " +
+            "The incoming Records must not contain field with the same name defined by the \"Partition Key Field\".");
+    static final AllowableValue PARTITION_GENERATED = new AllowableValue("Generated", "Generated UUID",
+            "Uses a generated UUID as value for the partition key. The incoming Records must not contain field with the same name defined by the \"Partition Key Field\".");
+
+    static final AllowableValue SORT_NONE = new AllowableValue("None", "None",
+            "The processor will not assign sort key to the inserted Items.");
+    static final AllowableValue SORT_BY_FIELD = new AllowableValue("ByField", "Sort By Field",
+            "With this strategy, the processor will use the value of the field identified by \"Sort Key Field\" as sort key value.");
+    static final AllowableValue SORT_BY_SEQUENCE = new AllowableValue("BySequence", "Generate Sequence",
+            "The processor will assign a number for every item based on the original record's position in the incoming FlowFile. This will be used as sort key value.");

Review comment:
       It should be mentioned here or in the Additional Details that this strategy only works when the sort key is of type `Number`.

##########
File path: nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/PutDynamoDBRecord.java
##########
@@ -0,0 +1,366 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.dynamodb;
+
+import com.amazonaws.AmazonClientException;
+import com.amazonaws.AmazonServiceException;
+import com.amazonaws.services.dynamodbv2.document.BatchWriteItemOutcome;
+import com.amazonaws.services.dynamodbv2.document.DynamoDB;
+import com.amazonaws.services.dynamodbv2.document.Item;
+import com.amazonaws.services.dynamodbv2.document.TableWriteItems;
+import com.amazonaws.services.dynamodbv2.model.ProvisionedThroughputExceededException;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
+import org.apache.nifi.annotation.behavior.SystemResource;
+import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
+import org.apache.nifi.annotation.behavior.SystemResourceConsiderations;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.proxy.ProxyConfigurationService;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.SplitRecordSetHandler;
+import org.apache.nifi.serialization.SplitRecordSetHandlerException;
+import org.apache.nifi.serialization.record.Record;
+
+import java.io.InputStream;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+@SeeAlso({DeleteDynamoDB.class, GetDynamoDB.class, PutDynamoDB.class})
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"Amazon", "DynamoDB", "AWS", "Put", "Insert", "Record"})
+@CapabilityDescription(
+        "Inserts items into DynamoDB based on record-oriented data. " +
+        "The record fields are mapped into DynamoDB item fields, including partition and sort keys if set. " +
+        "Depending on the number of records the processor might execute the insert in multiple chunks in order to overcome DynamoDB's limitation on batch writing. " +
+        "This might result partially processed FlowFiles in which case the FlowFile will be transferred to the \"unprocessed\" relationship " +
+        "with the necessary attribute to retry later without duplicating the already executed inserts."
+)
+@WritesAttributes({
+        @WritesAttribute(attribute = PutDynamoDBRecord.DYNAMODB_CHUNKS_PROCESSED_ATTRIBUTE, description = "Number of chunks successfully inserted into DynamoDB. If not set, it is considered as 0"),
+        @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_KEY_ERROR_UNPROCESSED, description = "DynamoDB unprocessed keys"),
+        @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_RANGE_KEY_VALUE_ERROR, description = "DynamoDB range key error"),
+        @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_KEY_ERROR_NOT_FOUND, description = "DynamoDB key not found"),
+        @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_EXCEPTION_MESSAGE, description = "DynamoDB exception message"),
+        @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_CODE, description = "DynamoDB error code"),
+        @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_MESSAGE, description = "DynamoDB error message"),
+        @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_TYPE, description = "DynamoDB error type"),
+        @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_SERVICE, description = "DynamoDB error service"),
+        @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_RETRYABLE, description = "DynamoDB error is retryable"),
+        @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_REQUEST_ID, description = "DynamoDB error request id"),
+        @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_STATUS_CODE, description = "DynamoDB error status code"),
+        @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ITEM_IO_ERROR, description = "IO exception message on creating item")
+})
+@ReadsAttribute(attribute = PutDynamoDBRecord.DYNAMODB_CHUNKS_PROCESSED_ATTRIBUTE, description = "Number of chunks successfully inserted into DynamoDB. If not set, it is considered as 0")
+@SystemResourceConsiderations({
+        @SystemResourceConsideration(resource = SystemResource.MEMORY),
+        @SystemResourceConsideration(resource = SystemResource.NETWORK)
+})
+public class PutDynamoDBRecord extends AbstractDynamoDBProcessor {
+
+    /**
+     * Due to DynamoDB's hardcoded limitation on the number of items in one batch, the processor writes them in chunks.
+     * Every chunk contains a number of items according to the limitations.
+     */
+    private static final int MAXIMUM_CHUNK_SIZE = 25;
+
+    static final String DYNAMODB_CHUNKS_PROCESSED_ATTRIBUTE = "dynamodb.chunks.processed";
+
+    static final AllowableValue PARTITION_BY_FIELD = new AllowableValue("ByField", "Partition By Field",
+            "Uses the value of the Record field identified by the \"Partition Field\" property as partition key value.");
+    static final AllowableValue PARTITION_BY_ATTRIBUTE = new AllowableValue("ByAttribute", "Partition By Attribute",
+            "Uses an incoming FlowFile attribute identified by \"Partition Attribute\" as the value of the partition key. " +
+            "The incoming Records must not contain field with the same name defined by the \"Partition Key Field\".");
+    static final AllowableValue PARTITION_GENERATED = new AllowableValue("Generated", "Generated UUID",
+            "Uses a generated UUID as value for the partition key. The incoming Records must not contain field with the same name defined by the \"Partition Key Field\".");
+
+    static final AllowableValue SORT_NONE = new AllowableValue("None", "None",
+            "The processor will not assign sort key to the inserted Items.");
+    static final AllowableValue SORT_BY_FIELD = new AllowableValue("ByField", "Sort By Field",
+            "With this strategy, the processor will use the value of the field identified by \"Sort Key Field\" as sort key value.");
+    static final AllowableValue SORT_BY_SEQUENCE = new AllowableValue("BySequence", "Generate Sequence",
+            "The processor will assign a number for every item based on the original record's position in the incoming FlowFile. This will be used as sort key value.");
+
+    static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
+            .name("record-reader")
+            .displayName("Record Reader")
+            .description("Specifies the Controller Service to use for parsing incoming data and determining the data's schema.")
+            .identifiesControllerService(RecordReaderFactory.class)
+            .required(true)
+            .build();
+
+    static final PropertyDescriptor PARTITION_KEY_STRATEGY = new PropertyDescriptor.Builder()
+            .name("partition-key-strategy")
+            .displayName("Partition Key Strategy")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .allowableValues(PARTITION_BY_FIELD, PARTITION_BY_ATTRIBUTE, PARTITION_GENERATED)
+            .defaultValue(PARTITION_BY_FIELD.getValue())
+            .description("Defines the strategy the processor uses to assign partition key value to the inserted Items.")
+            .build();
+
+    static final PropertyDescriptor PARTITION_KEY_FIELD = new PropertyDescriptor.Builder()
+            .name("partition-key-field")
+            .displayName("Partition Key Field")
+            .required(true)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .description(
+                    "Defines the name of the partition key field in the DynamoDB table. Partition key is also known as hash key. " +
+                    "Depending on the \"Partition Key Strategy\" the field value might come from the incoming Record or a generated one.")
+            .build();
+
+    static final PropertyDescriptor PARTITION_KEY_ATTRIBUTE = new PropertyDescriptor.Builder()
+            .name("partition-key-attribute")
+            .displayName("Partition Key Attribute")
+            .required(true)
+            .dependsOn(PARTITION_KEY_STRATEGY, PARTITION_BY_ATTRIBUTE)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .description("Specifies the FlowFile attribute that will be used as the value of the partition key when using \"Partition by attribute\" partition key strategy.")
+            .build();
+
+    static final PropertyDescriptor SORT_KEY_STRATEGY = new PropertyDescriptor.Builder()
+            .name("sort-key-strategy")
+            .displayName("Sort Key Strategy")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .allowableValues(SORT_NONE, SORT_BY_FIELD, SORT_BY_SEQUENCE)
+            .defaultValue(SORT_NONE.getValue())
+            .description("Defines the strategy the processor uses to assign sort key to the inserted Items.")
+            .build();
+
+    static final PropertyDescriptor SORT_KEY_FIELD = new PropertyDescriptor.Builder()
+            .name("sort-key-field")
+            .displayName("Sort Key Field")
+            .required(true)
+            .dependsOn(SORT_KEY_STRATEGY, SORT_BY_FIELD, SORT_BY_SEQUENCE)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .description("Defines the name of the sort key field in the DynamoDB table. Sort key is also known as range key.")
+            .build();
+
+    private static final List<PropertyDescriptor> PROPERTIES = Arrays.asList(
+            RECORD_READER,
+            new PropertyDescriptor.Builder().fromPropertyDescriptor(AWS_CREDENTIALS_PROVIDER_SERVICE).required(true).build(),
+            REGION,
+            TABLE,
+            PARTITION_KEY_STRATEGY,
+            PARTITION_KEY_FIELD,
+            PARTITION_KEY_ATTRIBUTE,
+            SORT_KEY_STRATEGY,
+            SORT_KEY_FIELD,
+            TIMEOUT,
+            ProxyConfigurationService.PROXY_CONFIGURATION_SERVICE,
+            SSL_CONTEXT_SERVICE
+    );
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTIES;
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+        final FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        final int alreadyProcessedChunks = flowFile.getAttribute(DYNAMODB_CHUNKS_PROCESSED_ATTRIBUTE) != null ? Integer.valueOf(flowFile.getAttribute(DYNAMODB_CHUNKS_PROCESSED_ATTRIBUTE)) : 0;
+        final RecordReaderFactory recordParserFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+        final SplitRecordSetHandler handler = new DynamoDbSplitRecordSetHandler(MAXIMUM_CHUNK_SIZE, getDynamoDB(), context, flowFile.getAttributes(), getLogger());
+        final SplitRecordSetHandler.RecordHandlerResult result;
+
+        try (
+            final InputStream in = session.read(flowFile);
+            final RecordReader reader = recordParserFactory.createRecordReader(flowFile, in, getLogger())) {
+            result = handler.handle(reader.createRecordSet(), alreadyProcessedChunks);
+        } catch (final Exception e) {
+            getLogger().error("Error while reading records: " + e.getMessage(), e);
+            session.transfer(flowFile, REL_FAILURE);
+            return;
+        }
+
+        final Map<String, String> attributes = new HashMap<>(flowFile.getAttributes());
+        attributes.put(DYNAMODB_CHUNKS_PROCESSED_ATTRIBUTE, String.valueOf(result.getSuccessfulChunks()));
+        final FlowFile outgoingFlowFile = session.putAllAttributes(flowFile, attributes);
+
+        if (result.isSuccess()) {
+            session.transfer(outgoingFlowFile, REL_SUCCESS);
+        } else if (result.getThrowable().getCause() != null && result.getThrowable().getCause() instanceof ProvisionedThroughputExceededException) {
+            /**
+             * When DynamoDB returns with {@code ProvisionedThroughputExceededException}, the client reached it's write limitation and
+             * should be retried at a later time. We yield the processor and the the FlowFile is considered unprocessed (partially processed) due to temporary write limitations.
+             * More about throughput limitations: https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/HowItWorks.ReadWriteCapacityMode.html
+             */
+            context.yield();
+            session.transfer(outgoingFlowFile, REL_UNPROCESSED);
+        } else if (result.getThrowable().getCause() != null && result.getThrowable().getCause() instanceof AmazonClientException) {
+            getLogger().error("Could not process FlowFile due to client exception: " + result.getThrowable().getMessage(), result.getThrowable());
+            session.transfer(processClientException(session, Collections.singletonList(outgoingFlowFile), (AmazonClientException) result.getThrowable().getCause()), REL_FAILURE);
+        } else if (result.getThrowable().getCause() != null && result.getThrowable().getCause() instanceof AmazonServiceException) {
+            getLogger().error("Could not process FlowFile due to server exception: " + result.getThrowable().getMessage(), result.getThrowable());
+            session.transfer(processServiceException(session, Collections.singletonList(outgoingFlowFile), (AmazonServiceException) result.getThrowable().getCause()), REL_FAILURE);
+        } else {
+            getLogger().error("Could not process FlowFile: " + result.getThrowable().getMessage(), result.getThrowable());
+            session.transfer(outgoingFlowFile, REL_FAILURE);
+        }
+    }
+
+    private static class DynamoDbSplitRecordSetHandler extends SplitRecordSetHandler {
+        private final DynamoDB dynamoDB;
+        private final String tableName;
+        private final ProcessContext context;
+        private final Map<String, String> flowFileAttributes;
+        private final ComponentLog logger;
+        private TableWriteItems accumulator;
+        private int itemCounter = 0;
+
+        private DynamoDbSplitRecordSetHandler(
+                final int maxChunkSize,
+                final DynamoDB dynamoDB,
+                final ProcessContext context,
+                final Map<String, String> flowFileAttributes,
+                final ComponentLog logger) {
+            super(maxChunkSize);
+            this.dynamoDB = dynamoDB;
+            this.context = context;
+            this.flowFileAttributes = flowFileAttributes;
+            this.logger = logger;
+            this.tableName = context.getProperty(TABLE).evaluateAttributeExpressions().getValue();
+            accumulator = new TableWriteItems(tableName);
+        }
+
+        @Override
+        protected void handleChunk(final boolean wasBatchAlreadyProcessed) throws SplitRecordSetHandlerException {
+            try {
+                if (!wasBatchAlreadyProcessed) {
+                    final BatchWriteItemOutcome outcome = dynamoDB.batchWriteItem(accumulator);
+
+                    if (!outcome.getUnprocessedItems().isEmpty()) {
+                        logger.error("Could not insert all items. The unprocessed items are: " + outcome.getUnprocessedItems().toString());
+                        throw new SplitRecordSetHandlerException("Could not insert all items");

Review comment:
       Do we need separate error log and exception? The exception could contain the error message logged.

##########
File path: nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/PutDynamoDBRecord.java
##########
@@ -0,0 +1,366 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.dynamodb;
+
+import com.amazonaws.AmazonClientException;
+import com.amazonaws.AmazonServiceException;
+import com.amazonaws.services.dynamodbv2.document.BatchWriteItemOutcome;
+import com.amazonaws.services.dynamodbv2.document.DynamoDB;
+import com.amazonaws.services.dynamodbv2.document.Item;
+import com.amazonaws.services.dynamodbv2.document.TableWriteItems;
+import com.amazonaws.services.dynamodbv2.model.ProvisionedThroughputExceededException;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
+import org.apache.nifi.annotation.behavior.SystemResource;
+import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
+import org.apache.nifi.annotation.behavior.SystemResourceConsiderations;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.proxy.ProxyConfigurationService;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.SplitRecordSetHandler;
+import org.apache.nifi.serialization.SplitRecordSetHandlerException;
+import org.apache.nifi.serialization.record.Record;
+
+import java.io.InputStream;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+@SeeAlso({DeleteDynamoDB.class, GetDynamoDB.class, PutDynamoDB.class})
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"Amazon", "DynamoDB", "AWS", "Put", "Insert", "Record"})
+@CapabilityDescription(
+        "Inserts items into DynamoDB based on record-oriented data. " +
+        "The record fields are mapped into DynamoDB item fields, including partition and sort keys if set. " +
+        "Depending on the number of records the processor might execute the insert in multiple chunks in order to overcome DynamoDB's limitation on batch writing. " +
+        "This might result partially processed FlowFiles in which case the FlowFile will be transferred to the \"unprocessed\" relationship " +
+        "with the necessary attribute to retry later without duplicating the already executed inserts."
+)
+@WritesAttributes({
+        @WritesAttribute(attribute = PutDynamoDBRecord.DYNAMODB_CHUNKS_PROCESSED_ATTRIBUTE, description = "Number of chunks successfully inserted into DynamoDB. If not set, it is considered as 0"),
+        @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_KEY_ERROR_UNPROCESSED, description = "DynamoDB unprocessed keys"),
+        @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_RANGE_KEY_VALUE_ERROR, description = "DynamoDB range key error"),
+        @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_KEY_ERROR_NOT_FOUND, description = "DynamoDB key not found"),
+        @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_EXCEPTION_MESSAGE, description = "DynamoDB exception message"),
+        @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_CODE, description = "DynamoDB error code"),
+        @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_MESSAGE, description = "DynamoDB error message"),
+        @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_TYPE, description = "DynamoDB error type"),
+        @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_SERVICE, description = "DynamoDB error service"),
+        @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_RETRYABLE, description = "DynamoDB error is retryable"),
+        @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_REQUEST_ID, description = "DynamoDB error request id"),
+        @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_STATUS_CODE, description = "DynamoDB error status code"),
+        @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ITEM_IO_ERROR, description = "IO exception message on creating item")
+})
+@ReadsAttribute(attribute = PutDynamoDBRecord.DYNAMODB_CHUNKS_PROCESSED_ATTRIBUTE, description = "Number of chunks successfully inserted into DynamoDB. If not set, it is considered as 0")
+@SystemResourceConsiderations({
+        @SystemResourceConsideration(resource = SystemResource.MEMORY),
+        @SystemResourceConsideration(resource = SystemResource.NETWORK)
+})
+public class PutDynamoDBRecord extends AbstractDynamoDBProcessor {
+
+    /**
+     * Due to DynamoDB's hardcoded limitation on the number of items in one batch, the processor writes them in chunks.
+     * Every chunk contains a number of items according to the limitations.
+     */
+    private static final int MAXIMUM_CHUNK_SIZE = 25;
+
+    static final String DYNAMODB_CHUNKS_PROCESSED_ATTRIBUTE = "dynamodb.chunks.processed";
+
+    static final AllowableValue PARTITION_BY_FIELD = new AllowableValue("ByField", "Partition By Field",
+            "Uses the value of the Record field identified by the \"Partition Field\" property as partition key value.");
+    static final AllowableValue PARTITION_BY_ATTRIBUTE = new AllowableValue("ByAttribute", "Partition By Attribute",
+            "Uses an incoming FlowFile attribute identified by \"Partition Attribute\" as the value of the partition key. " +
+            "The incoming Records must not contain field with the same name defined by the \"Partition Key Field\".");
+    static final AllowableValue PARTITION_GENERATED = new AllowableValue("Generated", "Generated UUID",
+            "Uses a generated UUID as value for the partition key. The incoming Records must not contain field with the same name defined by the \"Partition Key Field\".");
+
+    static final AllowableValue SORT_NONE = new AllowableValue("None", "None",
+            "The processor will not assign sort key to the inserted Items.");
+    static final AllowableValue SORT_BY_FIELD = new AllowableValue("ByField", "Sort By Field",
+            "With this strategy, the processor will use the value of the field identified by \"Sort Key Field\" as sort key value.");
+    static final AllowableValue SORT_BY_SEQUENCE = new AllowableValue("BySequence", "Generate Sequence",
+            "The processor will assign a number for every item based on the original record's position in the incoming FlowFile. This will be used as sort key value.");
+
+    static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
+            .name("record-reader")
+            .displayName("Record Reader")
+            .description("Specifies the Controller Service to use for parsing incoming data and determining the data's schema.")
+            .identifiesControllerService(RecordReaderFactory.class)
+            .required(true)
+            .build();
+
+    static final PropertyDescriptor PARTITION_KEY_STRATEGY = new PropertyDescriptor.Builder()
+            .name("partition-key-strategy")
+            .displayName("Partition Key Strategy")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .allowableValues(PARTITION_BY_FIELD, PARTITION_BY_ATTRIBUTE, PARTITION_GENERATED)
+            .defaultValue(PARTITION_BY_FIELD.getValue())
+            .description("Defines the strategy the processor uses to assign partition key value to the inserted Items.")
+            .build();
+
+    static final PropertyDescriptor PARTITION_KEY_FIELD = new PropertyDescriptor.Builder()
+            .name("partition-key-field")
+            .displayName("Partition Key Field")
+            .required(true)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .description(
+                    "Defines the name of the partition key field in the DynamoDB table. Partition key is also known as hash key. " +
+                    "Depending on the \"Partition Key Strategy\" the field value might come from the incoming Record or a generated one.")
+            .build();
+
+    static final PropertyDescriptor PARTITION_KEY_ATTRIBUTE = new PropertyDescriptor.Builder()
+            .name("partition-key-attribute")
+            .displayName("Partition Key Attribute")
+            .required(true)
+            .dependsOn(PARTITION_KEY_STRATEGY, PARTITION_BY_ATTRIBUTE)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .description("Specifies the FlowFile attribute that will be used as the value of the partition key when using \"Partition by attribute\" partition key strategy.")
+            .build();
+
+    static final PropertyDescriptor SORT_KEY_STRATEGY = new PropertyDescriptor.Builder()
+            .name("sort-key-strategy")
+            .displayName("Sort Key Strategy")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .allowableValues(SORT_NONE, SORT_BY_FIELD, SORT_BY_SEQUENCE)
+            .defaultValue(SORT_NONE.getValue())
+            .description("Defines the strategy the processor uses to assign sort key to the inserted Items.")
+            .build();
+
+    static final PropertyDescriptor SORT_KEY_FIELD = new PropertyDescriptor.Builder()
+            .name("sort-key-field")
+            .displayName("Sort Key Field")
+            .required(true)
+            .dependsOn(SORT_KEY_STRATEGY, SORT_BY_FIELD, SORT_BY_SEQUENCE)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .description("Defines the name of the sort key field in the DynamoDB table. Sort key is also known as range key.")
+            .build();
+
+    private static final List<PropertyDescriptor> PROPERTIES = Arrays.asList(
+            RECORD_READER,
+            new PropertyDescriptor.Builder().fromPropertyDescriptor(AWS_CREDENTIALS_PROVIDER_SERVICE).required(true).build(),
+            REGION,
+            TABLE,
+            PARTITION_KEY_STRATEGY,
+            PARTITION_KEY_FIELD,
+            PARTITION_KEY_ATTRIBUTE,
+            SORT_KEY_STRATEGY,
+            SORT_KEY_FIELD,
+            TIMEOUT,
+            ProxyConfigurationService.PROXY_CONFIGURATION_SERVICE,
+            SSL_CONTEXT_SERVICE
+    );
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTIES;
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+        final FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        final int alreadyProcessedChunks = flowFile.getAttribute(DYNAMODB_CHUNKS_PROCESSED_ATTRIBUTE) != null ? Integer.valueOf(flowFile.getAttribute(DYNAMODB_CHUNKS_PROCESSED_ATTRIBUTE)) : 0;
+        final RecordReaderFactory recordParserFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+        final SplitRecordSetHandler handler = new DynamoDbSplitRecordSetHandler(MAXIMUM_CHUNK_SIZE, getDynamoDB(), context, flowFile.getAttributes(), getLogger());
+        final SplitRecordSetHandler.RecordHandlerResult result;
+
+        try (
+            final InputStream in = session.read(flowFile);
+            final RecordReader reader = recordParserFactory.createRecordReader(flowFile, in, getLogger())) {
+            result = handler.handle(reader.createRecordSet(), alreadyProcessedChunks);
+        } catch (final Exception e) {
+            getLogger().error("Error while reading records: " + e.getMessage(), e);
+            session.transfer(flowFile, REL_FAILURE);
+            return;
+        }
+
+        final Map<String, String> attributes = new HashMap<>(flowFile.getAttributes());
+        attributes.put(DYNAMODB_CHUNKS_PROCESSED_ATTRIBUTE, String.valueOf(result.getSuccessfulChunks()));
+        final FlowFile outgoingFlowFile = session.putAllAttributes(flowFile, attributes);
+
+        if (result.isSuccess()) {
+            session.transfer(outgoingFlowFile, REL_SUCCESS);
+        } else if (result.getThrowable().getCause() != null && result.getThrowable().getCause() instanceof ProvisionedThroughputExceededException) {
+            /**
+             * When DynamoDB returns with {@code ProvisionedThroughputExceededException}, the client reached it's write limitation and
+             * should be retried at a later time. We yield the processor and the the FlowFile is considered unprocessed (partially processed) due to temporary write limitations.

Review comment:
       Typo: "the the"

##########
File path: nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/PutDynamoDBRecord.java
##########
@@ -0,0 +1,366 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.dynamodb;
+
+import com.amazonaws.AmazonClientException;
+import com.amazonaws.AmazonServiceException;
+import com.amazonaws.services.dynamodbv2.document.BatchWriteItemOutcome;
+import com.amazonaws.services.dynamodbv2.document.DynamoDB;
+import com.amazonaws.services.dynamodbv2.document.Item;
+import com.amazonaws.services.dynamodbv2.document.TableWriteItems;
+import com.amazonaws.services.dynamodbv2.model.ProvisionedThroughputExceededException;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
+import org.apache.nifi.annotation.behavior.SystemResource;
+import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
+import org.apache.nifi.annotation.behavior.SystemResourceConsiderations;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.proxy.ProxyConfigurationService;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.SplitRecordSetHandler;
+import org.apache.nifi.serialization.SplitRecordSetHandlerException;
+import org.apache.nifi.serialization.record.Record;
+
+import java.io.InputStream;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+@SeeAlso({DeleteDynamoDB.class, GetDynamoDB.class, PutDynamoDB.class})
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"Amazon", "DynamoDB", "AWS", "Put", "Insert", "Record"})
+@CapabilityDescription(
+        "Inserts items into DynamoDB based on record-oriented data. " +
+        "The record fields are mapped into DynamoDB item fields, including partition and sort keys if set. " +
+        "Depending on the number of records the processor might execute the insert in multiple chunks in order to overcome DynamoDB's limitation on batch writing. " +
+        "This might result partially processed FlowFiles in which case the FlowFile will be transferred to the \"unprocessed\" relationship " +
+        "with the necessary attribute to retry later without duplicating the already executed inserts."
+)
+@WritesAttributes({
+        @WritesAttribute(attribute = PutDynamoDBRecord.DYNAMODB_CHUNKS_PROCESSED_ATTRIBUTE, description = "Number of chunks successfully inserted into DynamoDB. If not set, it is considered as 0"),
+        @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_KEY_ERROR_UNPROCESSED, description = "DynamoDB unprocessed keys"),
+        @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_RANGE_KEY_VALUE_ERROR, description = "DynamoDB range key error"),
+        @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_KEY_ERROR_NOT_FOUND, description = "DynamoDB key not found"),
+        @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_EXCEPTION_MESSAGE, description = "DynamoDB exception message"),
+        @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_CODE, description = "DynamoDB error code"),
+        @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_MESSAGE, description = "DynamoDB error message"),
+        @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_TYPE, description = "DynamoDB error type"),
+        @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_SERVICE, description = "DynamoDB error service"),
+        @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_RETRYABLE, description = "DynamoDB error is retryable"),
+        @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_REQUEST_ID, description = "DynamoDB error request id"),
+        @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_STATUS_CODE, description = "DynamoDB error status code"),
+        @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ITEM_IO_ERROR, description = "IO exception message on creating item")
+})
+@ReadsAttribute(attribute = PutDynamoDBRecord.DYNAMODB_CHUNKS_PROCESSED_ATTRIBUTE, description = "Number of chunks successfully inserted into DynamoDB. If not set, it is considered as 0")
+@SystemResourceConsiderations({
+        @SystemResourceConsideration(resource = SystemResource.MEMORY),
+        @SystemResourceConsideration(resource = SystemResource.NETWORK)
+})
+public class PutDynamoDBRecord extends AbstractDynamoDBProcessor {
+
+    /**
+     * Due to DynamoDB's hardcoded limitation on the number of items in one batch, the processor writes them in chunks.
+     * Every chunk contains a number of items according to the limitations.
+     */
+    private static final int MAXIMUM_CHUNK_SIZE = 25;
+
+    static final String DYNAMODB_CHUNKS_PROCESSED_ATTRIBUTE = "dynamodb.chunks.processed";
+
+    static final AllowableValue PARTITION_BY_FIELD = new AllowableValue("ByField", "Partition By Field",
+            "Uses the value of the Record field identified by the \"Partition Field\" property as partition key value.");
+    static final AllowableValue PARTITION_BY_ATTRIBUTE = new AllowableValue("ByAttribute", "Partition By Attribute",
+            "Uses an incoming FlowFile attribute identified by \"Partition Attribute\" as the value of the partition key. " +
+            "The incoming Records must not contain field with the same name defined by the \"Partition Key Field\".");
+    static final AllowableValue PARTITION_GENERATED = new AllowableValue("Generated", "Generated UUID",
+            "Uses a generated UUID as value for the partition key. The incoming Records must not contain field with the same name defined by the \"Partition Key Field\".");
+
+    static final AllowableValue SORT_NONE = new AllowableValue("None", "None",
+            "The processor will not assign sort key to the inserted Items.");
+    static final AllowableValue SORT_BY_FIELD = new AllowableValue("ByField", "Sort By Field",
+            "With this strategy, the processor will use the value of the field identified by \"Sort Key Field\" as sort key value.");
+    static final AllowableValue SORT_BY_SEQUENCE = new AllowableValue("BySequence", "Generate Sequence",
+            "The processor will assign a number for every item based on the original record's position in the incoming FlowFile. This will be used as sort key value.");
+
+    static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
+            .name("record-reader")
+            .displayName("Record Reader")
+            .description("Specifies the Controller Service to use for parsing incoming data and determining the data's schema.")
+            .identifiesControllerService(RecordReaderFactory.class)
+            .required(true)
+            .build();
+
+    static final PropertyDescriptor PARTITION_KEY_STRATEGY = new PropertyDescriptor.Builder()
+            .name("partition-key-strategy")
+            .displayName("Partition Key Strategy")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .allowableValues(PARTITION_BY_FIELD, PARTITION_BY_ATTRIBUTE, PARTITION_GENERATED)
+            .defaultValue(PARTITION_BY_FIELD.getValue())
+            .description("Defines the strategy the processor uses to assign partition key value to the inserted Items.")
+            .build();
+
+    static final PropertyDescriptor PARTITION_KEY_FIELD = new PropertyDescriptor.Builder()
+            .name("partition-key-field")
+            .displayName("Partition Key Field")
+            .required(true)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .description(
+                    "Defines the name of the partition key field in the DynamoDB table. Partition key is also known as hash key. " +
+                    "Depending on the \"Partition Key Strategy\" the field value might come from the incoming Record or a generated one.")
+            .build();
+
+    static final PropertyDescriptor PARTITION_KEY_ATTRIBUTE = new PropertyDescriptor.Builder()
+            .name("partition-key-attribute")
+            .displayName("Partition Key Attribute")
+            .required(true)
+            .dependsOn(PARTITION_KEY_STRATEGY, PARTITION_BY_ATTRIBUTE)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .description("Specifies the FlowFile attribute that will be used as the value of the partition key when using \"Partition by attribute\" partition key strategy.")
+            .build();
+
+    static final PropertyDescriptor SORT_KEY_STRATEGY = new PropertyDescriptor.Builder()
+            .name("sort-key-strategy")
+            .displayName("Sort Key Strategy")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .allowableValues(SORT_NONE, SORT_BY_FIELD, SORT_BY_SEQUENCE)
+            .defaultValue(SORT_NONE.getValue())
+            .description("Defines the strategy the processor uses to assign sort key to the inserted Items.")
+            .build();
+
+    static final PropertyDescriptor SORT_KEY_FIELD = new PropertyDescriptor.Builder()
+            .name("sort-key-field")
+            .displayName("Sort Key Field")
+            .required(true)
+            .dependsOn(SORT_KEY_STRATEGY, SORT_BY_FIELD, SORT_BY_SEQUENCE)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .description("Defines the name of the sort key field in the DynamoDB table. Sort key is also known as range key.")
+            .build();
+
+    private static final List<PropertyDescriptor> PROPERTIES = Arrays.asList(
+            RECORD_READER,
+            new PropertyDescriptor.Builder().fromPropertyDescriptor(AWS_CREDENTIALS_PROVIDER_SERVICE).required(true).build(),
+            REGION,
+            TABLE,
+            PARTITION_KEY_STRATEGY,
+            PARTITION_KEY_FIELD,
+            PARTITION_KEY_ATTRIBUTE,
+            SORT_KEY_STRATEGY,
+            SORT_KEY_FIELD,
+            TIMEOUT,
+            ProxyConfigurationService.PROXY_CONFIGURATION_SERVICE,
+            SSL_CONTEXT_SERVICE
+    );
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTIES;
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+        final FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        final int alreadyProcessedChunks = flowFile.getAttribute(DYNAMODB_CHUNKS_PROCESSED_ATTRIBUTE) != null ? Integer.valueOf(flowFile.getAttribute(DYNAMODB_CHUNKS_PROCESSED_ATTRIBUTE)) : 0;
+        final RecordReaderFactory recordParserFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+        final SplitRecordSetHandler handler = new DynamoDbSplitRecordSetHandler(MAXIMUM_CHUNK_SIZE, getDynamoDB(), context, flowFile.getAttributes(), getLogger());
+        final SplitRecordSetHandler.RecordHandlerResult result;
+
+        try (
+            final InputStream in = session.read(flowFile);
+            final RecordReader reader = recordParserFactory.createRecordReader(flowFile, in, getLogger())) {
+            result = handler.handle(reader.createRecordSet(), alreadyProcessedChunks);
+        } catch (final Exception e) {
+            getLogger().error("Error while reading records: " + e.getMessage(), e);
+            session.transfer(flowFile, REL_FAILURE);
+            return;
+        }
+
+        final Map<String, String> attributes = new HashMap<>(flowFile.getAttributes());
+        attributes.put(DYNAMODB_CHUNKS_PROCESSED_ATTRIBUTE, String.valueOf(result.getSuccessfulChunks()));
+        final FlowFile outgoingFlowFile = session.putAllAttributes(flowFile, attributes);
+
+        if (result.isSuccess()) {
+            session.transfer(outgoingFlowFile, REL_SUCCESS);
+        } else if (result.getThrowable().getCause() != null && result.getThrowable().getCause() instanceof ProvisionedThroughputExceededException) {

Review comment:
       Null check is not necessary.

##########
File path: nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/PutDynamoDBRecord.java
##########
@@ -0,0 +1,366 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.dynamodb;
+
+import com.amazonaws.AmazonClientException;
+import com.amazonaws.AmazonServiceException;
+import com.amazonaws.services.dynamodbv2.document.BatchWriteItemOutcome;
+import com.amazonaws.services.dynamodbv2.document.DynamoDB;
+import com.amazonaws.services.dynamodbv2.document.Item;
+import com.amazonaws.services.dynamodbv2.document.TableWriteItems;
+import com.amazonaws.services.dynamodbv2.model.ProvisionedThroughputExceededException;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
+import org.apache.nifi.annotation.behavior.SystemResource;
+import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
+import org.apache.nifi.annotation.behavior.SystemResourceConsiderations;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.proxy.ProxyConfigurationService;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.SplitRecordSetHandler;
+import org.apache.nifi.serialization.SplitRecordSetHandlerException;
+import org.apache.nifi.serialization.record.Record;
+
+import java.io.InputStream;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+@SeeAlso({DeleteDynamoDB.class, GetDynamoDB.class, PutDynamoDB.class})
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"Amazon", "DynamoDB", "AWS", "Put", "Insert", "Record"})
+@CapabilityDescription(
+        "Inserts items into DynamoDB based on record-oriented data. " +
+        "The record fields are mapped into DynamoDB item fields, including partition and sort keys if set. " +
+        "Depending on the number of records the processor might execute the insert in multiple chunks in order to overcome DynamoDB's limitation on batch writing. " +
+        "This might result partially processed FlowFiles in which case the FlowFile will be transferred to the \"unprocessed\" relationship " +
+        "with the necessary attribute to retry later without duplicating the already executed inserts."
+)
+@WritesAttributes({
+        @WritesAttribute(attribute = PutDynamoDBRecord.DYNAMODB_CHUNKS_PROCESSED_ATTRIBUTE, description = "Number of chunks successfully inserted into DynamoDB. If not set, it is considered as 0"),
+        @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_KEY_ERROR_UNPROCESSED, description = "DynamoDB unprocessed keys"),
+        @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_RANGE_KEY_VALUE_ERROR, description = "DynamoDB range key error"),
+        @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_KEY_ERROR_NOT_FOUND, description = "DynamoDB key not found"),
+        @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_EXCEPTION_MESSAGE, description = "DynamoDB exception message"),
+        @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_CODE, description = "DynamoDB error code"),
+        @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_MESSAGE, description = "DynamoDB error message"),
+        @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_TYPE, description = "DynamoDB error type"),
+        @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_SERVICE, description = "DynamoDB error service"),
+        @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_RETRYABLE, description = "DynamoDB error is retryable"),
+        @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_REQUEST_ID, description = "DynamoDB error request id"),
+        @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_STATUS_CODE, description = "DynamoDB error status code"),
+        @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ITEM_IO_ERROR, description = "IO exception message on creating item")
+})
+@ReadsAttribute(attribute = PutDynamoDBRecord.DYNAMODB_CHUNKS_PROCESSED_ATTRIBUTE, description = "Number of chunks successfully inserted into DynamoDB. If not set, it is considered as 0")
+@SystemResourceConsiderations({
+        @SystemResourceConsideration(resource = SystemResource.MEMORY),
+        @SystemResourceConsideration(resource = SystemResource.NETWORK)
+})
+public class PutDynamoDBRecord extends AbstractDynamoDBProcessor {
+
+    /**
+     * Due to DynamoDB's hardcoded limitation on the number of items in one batch, the processor writes them in chunks.
+     * Every chunk contains a number of items according to the limitations.
+     */
+    private static final int MAXIMUM_CHUNK_SIZE = 25;
+
+    static final String DYNAMODB_CHUNKS_PROCESSED_ATTRIBUTE = "dynamodb.chunks.processed";
+
+    static final AllowableValue PARTITION_BY_FIELD = new AllowableValue("ByField", "Partition By Field",
+            "Uses the value of the Record field identified by the \"Partition Field\" property as partition key value.");
+    static final AllowableValue PARTITION_BY_ATTRIBUTE = new AllowableValue("ByAttribute", "Partition By Attribute",
+            "Uses an incoming FlowFile attribute identified by \"Partition Attribute\" as the value of the partition key. " +
+            "The incoming Records must not contain field with the same name defined by the \"Partition Key Field\".");
+    static final AllowableValue PARTITION_GENERATED = new AllowableValue("Generated", "Generated UUID",
+            "Uses a generated UUID as value for the partition key. The incoming Records must not contain field with the same name defined by the \"Partition Key Field\".");
+
+    static final AllowableValue SORT_NONE = new AllowableValue("None", "None",
+            "The processor will not assign sort key to the inserted Items.");
+    static final AllowableValue SORT_BY_FIELD = new AllowableValue("ByField", "Sort By Field",
+            "With this strategy, the processor will use the value of the field identified by \"Sort Key Field\" as sort key value.");

Review comment:
       I think it should be mentioned that the "field" refers to a Record field.
   A description similar to `PARTITION_BY_FIELD`'s would be more appropriate.

##########
File path: nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/PutDynamoDBRecord.java
##########
@@ -0,0 +1,366 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.dynamodb;
+
+import com.amazonaws.AmazonClientException;
+import com.amazonaws.AmazonServiceException;
+import com.amazonaws.services.dynamodbv2.document.BatchWriteItemOutcome;
+import com.amazonaws.services.dynamodbv2.document.DynamoDB;
+import com.amazonaws.services.dynamodbv2.document.Item;
+import com.amazonaws.services.dynamodbv2.document.TableWriteItems;
+import com.amazonaws.services.dynamodbv2.model.ProvisionedThroughputExceededException;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
+import org.apache.nifi.annotation.behavior.SystemResource;
+import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
+import org.apache.nifi.annotation.behavior.SystemResourceConsiderations;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.proxy.ProxyConfigurationService;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.SplitRecordSetHandler;
+import org.apache.nifi.serialization.SplitRecordSetHandlerException;
+import org.apache.nifi.serialization.record.Record;
+
+import java.io.InputStream;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+@SeeAlso({DeleteDynamoDB.class, GetDynamoDB.class, PutDynamoDB.class})
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"Amazon", "DynamoDB", "AWS", "Put", "Insert", "Record"})
+@CapabilityDescription(
+        "Inserts items into DynamoDB based on record-oriented data. " +
+        "The record fields are mapped into DynamoDB item fields, including partition and sort keys if set. " +
+        "Depending on the number of records the processor might execute the insert in multiple chunks in order to overcome DynamoDB's limitation on batch writing. " +
+        "This might result partially processed FlowFiles in which case the FlowFile will be transferred to the \"unprocessed\" relationship " +
+        "with the necessary attribute to retry later without duplicating the already executed inserts."
+)
+@WritesAttributes({
+        @WritesAttribute(attribute = PutDynamoDBRecord.DYNAMODB_CHUNKS_PROCESSED_ATTRIBUTE, description = "Number of chunks successfully inserted into DynamoDB. If not set, it is considered as 0"),
+        @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_KEY_ERROR_UNPROCESSED, description = "DynamoDB unprocessed keys"),
+        @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_RANGE_KEY_VALUE_ERROR, description = "DynamoDB range key error"),
+        @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_KEY_ERROR_NOT_FOUND, description = "DynamoDB key not found"),
+        @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_EXCEPTION_MESSAGE, description = "DynamoDB exception message"),
+        @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_CODE, description = "DynamoDB error code"),
+        @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_MESSAGE, description = "DynamoDB error message"),
+        @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_TYPE, description = "DynamoDB error type"),
+        @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_SERVICE, description = "DynamoDB error service"),
+        @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_RETRYABLE, description = "DynamoDB error is retryable"),
+        @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_REQUEST_ID, description = "DynamoDB error request id"),
+        @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_STATUS_CODE, description = "DynamoDB error status code"),
+        @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ITEM_IO_ERROR, description = "IO exception message on creating item")
+})
+@ReadsAttribute(attribute = PutDynamoDBRecord.DYNAMODB_CHUNKS_PROCESSED_ATTRIBUTE, description = "Number of chunks successfully inserted into DynamoDB. If not set, it is considered as 0")
+@SystemResourceConsiderations({
+        @SystemResourceConsideration(resource = SystemResource.MEMORY),
+        @SystemResourceConsideration(resource = SystemResource.NETWORK)
+})
+public class PutDynamoDBRecord extends AbstractDynamoDBProcessor {
+
+    /**
+     * Due to DynamoDB's hardcoded limitation on the number of items in one batch, the processor writes them in chunks.
+     * Every chunk contains a number of items according to the limitations.
+     */
+    private static final int MAXIMUM_CHUNK_SIZE = 25;
+
+    static final String DYNAMODB_CHUNKS_PROCESSED_ATTRIBUTE = "dynamodb.chunks.processed";
+
+    static final AllowableValue PARTITION_BY_FIELD = new AllowableValue("ByField", "Partition By Field",
+            "Uses the value of the Record field identified by the \"Partition Field\" property as partition key value.");
+    static final AllowableValue PARTITION_BY_ATTRIBUTE = new AllowableValue("ByAttribute", "Partition By Attribute",
+            "Uses an incoming FlowFile attribute identified by \"Partition Attribute\" as the value of the partition key. " +
+            "The incoming Records must not contain field with the same name defined by the \"Partition Key Field\".");
+    static final AllowableValue PARTITION_GENERATED = new AllowableValue("Generated", "Generated UUID",
+            "Uses a generated UUID as value for the partition key. The incoming Records must not contain field with the same name defined by the \"Partition Key Field\".");
+
+    static final AllowableValue SORT_NONE = new AllowableValue("None", "None",
+            "The processor will not assign sort key to the inserted Items.");
+    static final AllowableValue SORT_BY_FIELD = new AllowableValue("ByField", "Sort By Field",
+            "With this strategy, the processor will use the value of the field identified by \"Sort Key Field\" as sort key value.");
+    static final AllowableValue SORT_BY_SEQUENCE = new AllowableValue("BySequence", "Generate Sequence",
+            "The processor will assign a number for every item based on the original record's position in the incoming FlowFile. This will be used as sort key value.");
+
+    static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
+            .name("record-reader")
+            .displayName("Record Reader")
+            .description("Specifies the Controller Service to use for parsing incoming data and determining the data's schema.")
+            .identifiesControllerService(RecordReaderFactory.class)
+            .required(true)
+            .build();
+
+    static final PropertyDescriptor PARTITION_KEY_STRATEGY = new PropertyDescriptor.Builder()
+            .name("partition-key-strategy")
+            .displayName("Partition Key Strategy")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .allowableValues(PARTITION_BY_FIELD, PARTITION_BY_ATTRIBUTE, PARTITION_GENERATED)
+            .defaultValue(PARTITION_BY_FIELD.getValue())
+            .description("Defines the strategy the processor uses to assign partition key value to the inserted Items.")
+            .build();
+
+    static final PropertyDescriptor PARTITION_KEY_FIELD = new PropertyDescriptor.Builder()
+            .name("partition-key-field")
+            .displayName("Partition Key Field")
+            .required(true)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .description(
+                    "Defines the name of the partition key field in the DynamoDB table. Partition key is also known as hash key. " +
+                    "Depending on the \"Partition Key Strategy\" the field value might come from the incoming Record or a generated one.")
+            .build();
+
+    static final PropertyDescriptor PARTITION_KEY_ATTRIBUTE = new PropertyDescriptor.Builder()
+            .name("partition-key-attribute")
+            .displayName("Partition Key Attribute")
+            .required(true)
+            .dependsOn(PARTITION_KEY_STRATEGY, PARTITION_BY_ATTRIBUTE)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .description("Specifies the FlowFile attribute that will be used as the value of the partition key when using \"Partition by attribute\" partition key strategy.")
+            .build();
+
+    static final PropertyDescriptor SORT_KEY_STRATEGY = new PropertyDescriptor.Builder()
+            .name("sort-key-strategy")
+            .displayName("Sort Key Strategy")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .allowableValues(SORT_NONE, SORT_BY_FIELD, SORT_BY_SEQUENCE)
+            .defaultValue(SORT_NONE.getValue())
+            .description("Defines the strategy the processor uses to assign sort key to the inserted Items.")
+            .build();
+
+    static final PropertyDescriptor SORT_KEY_FIELD = new PropertyDescriptor.Builder()
+            .name("sort-key-field")
+            .displayName("Sort Key Field")
+            .required(true)
+            .dependsOn(SORT_KEY_STRATEGY, SORT_BY_FIELD, SORT_BY_SEQUENCE)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .description("Defines the name of the sort key field in the DynamoDB table. Sort key is also known as range key.")
+            .build();
+
+    private static final List<PropertyDescriptor> PROPERTIES = Arrays.asList(
+            RECORD_READER,
+            new PropertyDescriptor.Builder().fromPropertyDescriptor(AWS_CREDENTIALS_PROVIDER_SERVICE).required(true).build(),
+            REGION,
+            TABLE,
+            PARTITION_KEY_STRATEGY,
+            PARTITION_KEY_FIELD,
+            PARTITION_KEY_ATTRIBUTE,
+            SORT_KEY_STRATEGY,
+            SORT_KEY_FIELD,
+            TIMEOUT,
+            ProxyConfigurationService.PROXY_CONFIGURATION_SERVICE,
+            SSL_CONTEXT_SERVICE
+    );
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTIES;
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+        final FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        final int alreadyProcessedChunks = flowFile.getAttribute(DYNAMODB_CHUNKS_PROCESSED_ATTRIBUTE) != null ? Integer.valueOf(flowFile.getAttribute(DYNAMODB_CHUNKS_PROCESSED_ATTRIBUTE)) : 0;
+        final RecordReaderFactory recordParserFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+        final SplitRecordSetHandler handler = new DynamoDbSplitRecordSetHandler(MAXIMUM_CHUNK_SIZE, getDynamoDB(), context, flowFile.getAttributes(), getLogger());
+        final SplitRecordSetHandler.RecordHandlerResult result;
+
+        try (
+            final InputStream in = session.read(flowFile);
+            final RecordReader reader = recordParserFactory.createRecordReader(flowFile, in, getLogger())) {
+            result = handler.handle(reader.createRecordSet(), alreadyProcessedChunks);
+        } catch (final Exception e) {
+            getLogger().error("Error while reading records: " + e.getMessage(), e);
+            session.transfer(flowFile, REL_FAILURE);
+            return;
+        }
+
+        final Map<String, String> attributes = new HashMap<>(flowFile.getAttributes());
+        attributes.put(DYNAMODB_CHUNKS_PROCESSED_ATTRIBUTE, String.valueOf(result.getSuccessfulChunks()));
+        final FlowFile outgoingFlowFile = session.putAllAttributes(flowFile, attributes);
+
+        if (result.isSuccess()) {
+            session.transfer(outgoingFlowFile, REL_SUCCESS);
+        } else if (result.getThrowable().getCause() != null && result.getThrowable().getCause() instanceof ProvisionedThroughputExceededException) {
+            /**
+             * When DynamoDB returns with {@code ProvisionedThroughputExceededException}, the client reached it's write limitation and
+             * should be retried at a later time. We yield the processor and the the FlowFile is considered unprocessed (partially processed) due to temporary write limitations.
+             * More about throughput limitations: https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/HowItWorks.ReadWriteCapacityMode.html
+             */
+            context.yield();
+            session.transfer(outgoingFlowFile, REL_UNPROCESSED);
+        } else if (result.getThrowable().getCause() != null && result.getThrowable().getCause() instanceof AmazonClientException) {
+            getLogger().error("Could not process FlowFile due to client exception: " + result.getThrowable().getMessage(), result.getThrowable());
+            session.transfer(processClientException(session, Collections.singletonList(outgoingFlowFile), (AmazonClientException) result.getThrowable().getCause()), REL_FAILURE);
+        } else if (result.getThrowable().getCause() != null && result.getThrowable().getCause() instanceof AmazonServiceException) {
+            getLogger().error("Could not process FlowFile due to server exception: " + result.getThrowable().getMessage(), result.getThrowable());
+            session.transfer(processServiceException(session, Collections.singletonList(outgoingFlowFile), (AmazonServiceException) result.getThrowable().getCause()), REL_FAILURE);
+        } else {
+            getLogger().error("Could not process FlowFile: " + result.getThrowable().getMessage(), result.getThrowable());
+            session.transfer(outgoingFlowFile, REL_FAILURE);
+        }
+    }
+
+    private static class DynamoDbSplitRecordSetHandler extends SplitRecordSetHandler {
+        private final DynamoDB dynamoDB;
+        private final String tableName;
+        private final ProcessContext context;
+        private final Map<String, String> flowFileAttributes;
+        private final ComponentLog logger;
+        private TableWriteItems accumulator;
+        private int itemCounter = 0;
+
+        private DynamoDbSplitRecordSetHandler(
+                final int maxChunkSize,
+                final DynamoDB dynamoDB,
+                final ProcessContext context,
+                final Map<String, String> flowFileAttributes,
+                final ComponentLog logger) {
+            super(maxChunkSize);
+            this.dynamoDB = dynamoDB;
+            this.context = context;
+            this.flowFileAttributes = flowFileAttributes;
+            this.logger = logger;
+            this.tableName = context.getProperty(TABLE).evaluateAttributeExpressions().getValue();
+            accumulator = new TableWriteItems(tableName);
+        }
+
+        @Override
+        protected void handleChunk(final boolean wasBatchAlreadyProcessed) throws SplitRecordSetHandlerException {
+            try {
+                if (!wasBatchAlreadyProcessed) {
+                    final BatchWriteItemOutcome outcome = dynamoDB.batchWriteItem(accumulator);
+
+                    if (!outcome.getUnprocessedItems().isEmpty()) {
+                        logger.error("Could not insert all items. The unprocessed items are: " + outcome.getUnprocessedItems().toString());
+                        throw new SplitRecordSetHandlerException("Could not insert all items");
+                    }
+                } else {
+                    logger.debug("Skipping chunk as was already processed");
+                }
+
+                accumulator = new TableWriteItems(tableName);
+            } catch (final Exception e) {
+                throw new SplitRecordSetHandlerException(e);
+            }
+        }
+
+        @Override
+        protected void addToChunk(final Record record) {
+            itemCounter++;
+            accumulator.addItemToPut(convert(record));
+        }
+
+        private Item convert(final Record record) {
+            final String partitionField  = context.getProperty(PARTITION_KEY_FIELD).evaluateAttributeExpressions().getValue();

Review comment:
       `partitionKeyField` would be consistent with the property name.

##########
File path: nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/PutDynamoDBRecord.java
##########
@@ -0,0 +1,366 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.dynamodb;
+
+import com.amazonaws.AmazonClientException;
+import com.amazonaws.AmazonServiceException;
+import com.amazonaws.services.dynamodbv2.document.BatchWriteItemOutcome;
+import com.amazonaws.services.dynamodbv2.document.DynamoDB;
+import com.amazonaws.services.dynamodbv2.document.Item;
+import com.amazonaws.services.dynamodbv2.document.TableWriteItems;
+import com.amazonaws.services.dynamodbv2.model.ProvisionedThroughputExceededException;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
+import org.apache.nifi.annotation.behavior.SystemResource;
+import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
+import org.apache.nifi.annotation.behavior.SystemResourceConsiderations;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.proxy.ProxyConfigurationService;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.SplitRecordSetHandler;
+import org.apache.nifi.serialization.SplitRecordSetHandlerException;
+import org.apache.nifi.serialization.record.Record;
+
+import java.io.InputStream;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+@SeeAlso({DeleteDynamoDB.class, GetDynamoDB.class, PutDynamoDB.class})
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"Amazon", "DynamoDB", "AWS", "Put", "Insert", "Record"})
+@CapabilityDescription(
+        "Inserts items into DynamoDB based on record-oriented data. " +
+        "The record fields are mapped into DynamoDB item fields, including partition and sort keys if set. " +
+        "Depending on the number of records the processor might execute the insert in multiple chunks in order to overcome DynamoDB's limitation on batch writing. " +
+        "This might result partially processed FlowFiles in which case the FlowFile will be transferred to the \"unprocessed\" relationship " +
+        "with the necessary attribute to retry later without duplicating the already executed inserts."
+)
+@WritesAttributes({
+        @WritesAttribute(attribute = PutDynamoDBRecord.DYNAMODB_CHUNKS_PROCESSED_ATTRIBUTE, description = "Number of chunks successfully inserted into DynamoDB. If not set, it is considered as 0"),
+        @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_KEY_ERROR_UNPROCESSED, description = "DynamoDB unprocessed keys"),
+        @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_RANGE_KEY_VALUE_ERROR, description = "DynamoDB range key error"),
+        @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_KEY_ERROR_NOT_FOUND, description = "DynamoDB key not found"),
+        @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_EXCEPTION_MESSAGE, description = "DynamoDB exception message"),
+        @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_CODE, description = "DynamoDB error code"),
+        @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_MESSAGE, description = "DynamoDB error message"),
+        @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_TYPE, description = "DynamoDB error type"),
+        @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_SERVICE, description = "DynamoDB error service"),
+        @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_RETRYABLE, description = "DynamoDB error is retryable"),
+        @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_REQUEST_ID, description = "DynamoDB error request id"),
+        @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_STATUS_CODE, description = "DynamoDB error status code"),
+        @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ITEM_IO_ERROR, description = "IO exception message on creating item")
+})
+@ReadsAttribute(attribute = PutDynamoDBRecord.DYNAMODB_CHUNKS_PROCESSED_ATTRIBUTE, description = "Number of chunks successfully inserted into DynamoDB. If not set, it is considered as 0")
+@SystemResourceConsiderations({
+        @SystemResourceConsideration(resource = SystemResource.MEMORY),
+        @SystemResourceConsideration(resource = SystemResource.NETWORK)
+})
+public class PutDynamoDBRecord extends AbstractDynamoDBProcessor {
+
+    /**
+     * Due to DynamoDB's hardcoded limitation on the number of items in one batch, the processor writes them in chunks.
+     * Every chunk contains a number of items according to the limitations.
+     */
+    private static final int MAXIMUM_CHUNK_SIZE = 25;
+
+    static final String DYNAMODB_CHUNKS_PROCESSED_ATTRIBUTE = "dynamodb.chunks.processed";
+
+    static final AllowableValue PARTITION_BY_FIELD = new AllowableValue("ByField", "Partition By Field",
+            "Uses the value of the Record field identified by the \"Partition Field\" property as partition key value.");

Review comment:
       `Partition Field` => `Partition Key Field`

##########
File path: nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/PutDynamoDBRecord.java
##########
@@ -0,0 +1,366 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.dynamodb;
+
+import com.amazonaws.AmazonClientException;
+import com.amazonaws.AmazonServiceException;
+import com.amazonaws.services.dynamodbv2.document.BatchWriteItemOutcome;
+import com.amazonaws.services.dynamodbv2.document.DynamoDB;
+import com.amazonaws.services.dynamodbv2.document.Item;
+import com.amazonaws.services.dynamodbv2.document.TableWriteItems;
+import com.amazonaws.services.dynamodbv2.model.ProvisionedThroughputExceededException;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
+import org.apache.nifi.annotation.behavior.SystemResource;
+import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
+import org.apache.nifi.annotation.behavior.SystemResourceConsiderations;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.proxy.ProxyConfigurationService;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.SplitRecordSetHandler;
+import org.apache.nifi.serialization.SplitRecordSetHandlerException;
+import org.apache.nifi.serialization.record.Record;
+
+import java.io.InputStream;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+@SeeAlso({DeleteDynamoDB.class, GetDynamoDB.class, PutDynamoDB.class})
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"Amazon", "DynamoDB", "AWS", "Put", "Insert", "Record"})
+@CapabilityDescription(
+        "Inserts items into DynamoDB based on record-oriented data. " +
+        "The record fields are mapped into DynamoDB item fields, including partition and sort keys if set. " +
+        "Depending on the number of records the processor might execute the insert in multiple chunks in order to overcome DynamoDB's limitation on batch writing. " +
+        "This might result partially processed FlowFiles in which case the FlowFile will be transferred to the \"unprocessed\" relationship " +
+        "with the necessary attribute to retry later without duplicating the already executed inserts."
+)
+@WritesAttributes({
+        @WritesAttribute(attribute = PutDynamoDBRecord.DYNAMODB_CHUNKS_PROCESSED_ATTRIBUTE, description = "Number of chunks successfully inserted into DynamoDB. If not set, it is considered as 0"),
+        @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_KEY_ERROR_UNPROCESSED, description = "DynamoDB unprocessed keys"),
+        @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_RANGE_KEY_VALUE_ERROR, description = "DynamoDB range key error"),
+        @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_KEY_ERROR_NOT_FOUND, description = "DynamoDB key not found"),
+        @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_EXCEPTION_MESSAGE, description = "DynamoDB exception message"),
+        @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_CODE, description = "DynamoDB error code"),
+        @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_MESSAGE, description = "DynamoDB error message"),
+        @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_TYPE, description = "DynamoDB error type"),
+        @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_SERVICE, description = "DynamoDB error service"),
+        @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_RETRYABLE, description = "DynamoDB error is retryable"),
+        @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_REQUEST_ID, description = "DynamoDB error request id"),
+        @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_STATUS_CODE, description = "DynamoDB error status code"),
+        @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ITEM_IO_ERROR, description = "IO exception message on creating item")
+})
+@ReadsAttribute(attribute = PutDynamoDBRecord.DYNAMODB_CHUNKS_PROCESSED_ATTRIBUTE, description = "Number of chunks successfully inserted into DynamoDB. If not set, it is considered as 0")
+@SystemResourceConsiderations({
+        @SystemResourceConsideration(resource = SystemResource.MEMORY),
+        @SystemResourceConsideration(resource = SystemResource.NETWORK)
+})
+public class PutDynamoDBRecord extends AbstractDynamoDBProcessor {
+
+    /**
+     * Due to DynamoDB's hardcoded limitation on the number of items in one batch, the processor writes them in chunks.
+     * Every chunk contains a number of items according to the limitations.
+     */
+    private static final int MAXIMUM_CHUNK_SIZE = 25;
+
+    static final String DYNAMODB_CHUNKS_PROCESSED_ATTRIBUTE = "dynamodb.chunks.processed";
+
+    static final AllowableValue PARTITION_BY_FIELD = new AllowableValue("ByField", "Partition By Field",
+            "Uses the value of the Record field identified by the \"Partition Field\" property as partition key value.");
+    static final AllowableValue PARTITION_BY_ATTRIBUTE = new AllowableValue("ByAttribute", "Partition By Attribute",
+            "Uses an incoming FlowFile attribute identified by \"Partition Attribute\" as the value of the partition key. " +
+            "The incoming Records must not contain field with the same name defined by the \"Partition Key Field\".");
+    static final AllowableValue PARTITION_GENERATED = new AllowableValue("Generated", "Generated UUID",
+            "Uses a generated UUID as value for the partition key. The incoming Records must not contain field with the same name defined by the \"Partition Key Field\".");
+
+    static final AllowableValue SORT_NONE = new AllowableValue("None", "None",
+            "The processor will not assign sort key to the inserted Items.");
+    static final AllowableValue SORT_BY_FIELD = new AllowableValue("ByField", "Sort By Field",
+            "With this strategy, the processor will use the value of the field identified by \"Sort Key Field\" as sort key value.");
+    static final AllowableValue SORT_BY_SEQUENCE = new AllowableValue("BySequence", "Generate Sequence",
+            "The processor will assign a number for every item based on the original record's position in the incoming FlowFile. This will be used as sort key value.");
+
+    static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
+            .name("record-reader")
+            .displayName("Record Reader")
+            .description("Specifies the Controller Service to use for parsing incoming data and determining the data's schema.")
+            .identifiesControllerService(RecordReaderFactory.class)
+            .required(true)
+            .build();
+
+    static final PropertyDescriptor PARTITION_KEY_STRATEGY = new PropertyDescriptor.Builder()
+            .name("partition-key-strategy")
+            .displayName("Partition Key Strategy")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .allowableValues(PARTITION_BY_FIELD, PARTITION_BY_ATTRIBUTE, PARTITION_GENERATED)
+            .defaultValue(PARTITION_BY_FIELD.getValue())
+            .description("Defines the strategy the processor uses to assign partition key value to the inserted Items.")
+            .build();
+
+    static final PropertyDescriptor PARTITION_KEY_FIELD = new PropertyDescriptor.Builder()
+            .name("partition-key-field")
+            .displayName("Partition Key Field")
+            .required(true)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .description(
+                    "Defines the name of the partition key field in the DynamoDB table. Partition key is also known as hash key. " +
+                    "Depending on the \"Partition Key Strategy\" the field value might come from the incoming Record or a generated one.")
+            .build();
+
+    static final PropertyDescriptor PARTITION_KEY_ATTRIBUTE = new PropertyDescriptor.Builder()
+            .name("partition-key-attribute")
+            .displayName("Partition Key Attribute")
+            .required(true)
+            .dependsOn(PARTITION_KEY_STRATEGY, PARTITION_BY_ATTRIBUTE)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .description("Specifies the FlowFile attribute that will be used as the value of the partition key when using \"Partition by attribute\" partition key strategy.")
+            .build();
+
+    static final PropertyDescriptor SORT_KEY_STRATEGY = new PropertyDescriptor.Builder()
+            .name("sort-key-strategy")
+            .displayName("Sort Key Strategy")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .allowableValues(SORT_NONE, SORT_BY_FIELD, SORT_BY_SEQUENCE)
+            .defaultValue(SORT_NONE.getValue())
+            .description("Defines the strategy the processor uses to assign sort key to the inserted Items.")
+            .build();
+
+    static final PropertyDescriptor SORT_KEY_FIELD = new PropertyDescriptor.Builder()
+            .name("sort-key-field")
+            .displayName("Sort Key Field")
+            .required(true)
+            .dependsOn(SORT_KEY_STRATEGY, SORT_BY_FIELD, SORT_BY_SEQUENCE)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .description("Defines the name of the sort key field in the DynamoDB table. Sort key is also known as range key.")
+            .build();
+
+    private static final List<PropertyDescriptor> PROPERTIES = Arrays.asList(
+            RECORD_READER,
+            new PropertyDescriptor.Builder().fromPropertyDescriptor(AWS_CREDENTIALS_PROVIDER_SERVICE).required(true).build(),
+            REGION,
+            TABLE,
+            PARTITION_KEY_STRATEGY,
+            PARTITION_KEY_FIELD,
+            PARTITION_KEY_ATTRIBUTE,
+            SORT_KEY_STRATEGY,
+            SORT_KEY_FIELD,
+            TIMEOUT,
+            ProxyConfigurationService.PROXY_CONFIGURATION_SERVICE,
+            SSL_CONTEXT_SERVICE
+    );
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTIES;
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+        final FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        final int alreadyProcessedChunks = flowFile.getAttribute(DYNAMODB_CHUNKS_PROCESSED_ATTRIBUTE) != null ? Integer.valueOf(flowFile.getAttribute(DYNAMODB_CHUNKS_PROCESSED_ATTRIBUTE)) : 0;
+        final RecordReaderFactory recordParserFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+        final SplitRecordSetHandler handler = new DynamoDbSplitRecordSetHandler(MAXIMUM_CHUNK_SIZE, getDynamoDB(), context, flowFile.getAttributes(), getLogger());
+        final SplitRecordSetHandler.RecordHandlerResult result;
+
+        try (
+            final InputStream in = session.read(flowFile);
+            final RecordReader reader = recordParserFactory.createRecordReader(flowFile, in, getLogger())) {
+            result = handler.handle(reader.createRecordSet(), alreadyProcessedChunks);
+        } catch (final Exception e) {
+            getLogger().error("Error while reading records: " + e.getMessage(), e);
+            session.transfer(flowFile, REL_FAILURE);
+            return;
+        }
+
+        final Map<String, String> attributes = new HashMap<>(flowFile.getAttributes());
+        attributes.put(DYNAMODB_CHUNKS_PROCESSED_ATTRIBUTE, String.valueOf(result.getSuccessfulChunks()));
+        final FlowFile outgoingFlowFile = session.putAllAttributes(flowFile, attributes);
+
+        if (result.isSuccess()) {
+            session.transfer(outgoingFlowFile, REL_SUCCESS);
+        } else if (result.getThrowable().getCause() != null && result.getThrowable().getCause() instanceof ProvisionedThroughputExceededException) {
+            /**
+             * When DynamoDB returns with {@code ProvisionedThroughputExceededException}, the client reached it's write limitation and
+             * should be retried at a later time. We yield the processor and the the FlowFile is considered unprocessed (partially processed) due to temporary write limitations.
+             * More about throughput limitations: https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/HowItWorks.ReadWriteCapacityMode.html
+             */
+            context.yield();
+            session.transfer(outgoingFlowFile, REL_UNPROCESSED);
+        } else if (result.getThrowable().getCause() != null && result.getThrowable().getCause() instanceof AmazonClientException) {
+            getLogger().error("Could not process FlowFile due to client exception: " + result.getThrowable().getMessage(), result.getThrowable());
+            session.transfer(processClientException(session, Collections.singletonList(outgoingFlowFile), (AmazonClientException) result.getThrowable().getCause()), REL_FAILURE);
+        } else if (result.getThrowable().getCause() != null && result.getThrowable().getCause() instanceof AmazonServiceException) {
+            getLogger().error("Could not process FlowFile due to server exception: " + result.getThrowable().getMessage(), result.getThrowable());
+            session.transfer(processServiceException(session, Collections.singletonList(outgoingFlowFile), (AmazonServiceException) result.getThrowable().getCause()), REL_FAILURE);
+        } else {
+            getLogger().error("Could not process FlowFile: " + result.getThrowable().getMessage(), result.getThrowable());
+            session.transfer(outgoingFlowFile, REL_FAILURE);
+        }
+    }
+
+    private static class DynamoDbSplitRecordSetHandler extends SplitRecordSetHandler {
+        private final DynamoDB dynamoDB;
+        private final String tableName;
+        private final ProcessContext context;
+        private final Map<String, String> flowFileAttributes;
+        private final ComponentLog logger;
+        private TableWriteItems accumulator;
+        private int itemCounter = 0;
+
+        private DynamoDbSplitRecordSetHandler(
+                final int maxChunkSize,
+                final DynamoDB dynamoDB,
+                final ProcessContext context,
+                final Map<String, String> flowFileAttributes,
+                final ComponentLog logger) {
+            super(maxChunkSize);
+            this.dynamoDB = dynamoDB;
+            this.context = context;
+            this.flowFileAttributes = flowFileAttributes;
+            this.logger = logger;
+            this.tableName = context.getProperty(TABLE).evaluateAttributeExpressions().getValue();
+            accumulator = new TableWriteItems(tableName);
+        }
+
+        @Override
+        protected void handleChunk(final boolean wasBatchAlreadyProcessed) throws SplitRecordSetHandlerException {
+            try {
+                if (!wasBatchAlreadyProcessed) {
+                    final BatchWriteItemOutcome outcome = dynamoDB.batchWriteItem(accumulator);
+
+                    if (!outcome.getUnprocessedItems().isEmpty()) {
+                        logger.error("Could not insert all items. The unprocessed items are: " + outcome.getUnprocessedItems().toString());
+                        throw new SplitRecordSetHandlerException("Could not insert all items");
+                    }
+                } else {
+                    logger.debug("Skipping chunk as was already processed");
+                }
+
+                accumulator = new TableWriteItems(tableName);
+            } catch (final Exception e) {
+                throw new SplitRecordSetHandlerException(e);
+            }
+        }
+
+        @Override
+        protected void addToChunk(final Record record) {
+            itemCounter++;
+            accumulator.addItemToPut(convert(record));
+        }
+
+        private Item convert(final Record record) {
+            final String partitionField  = context.getProperty(PARTITION_KEY_FIELD).evaluateAttributeExpressions().getValue();
+            final String sortKeyStrategy  = context.getProperty(SORT_KEY_STRATEGY).getValue();
+            final String sortKeyField  = context.getProperty(SORT_KEY_FIELD).evaluateAttributeExpressions().getValue();
+
+            final Item result = new Item();
+
+            record.getSchema()
+                    .getFields()
+                    .stream()
+                    .filter(field -> !field.getFieldName().equals(partitionField))
+                    .filter(field -> SORT_NONE.getValue().equals(sortKeyStrategy) || !field.getFieldName().equals(sortKeyField))
+                    .forEach(field -> RecordToItemConverter.addField(record, result, field.getDataType().getFieldType(), field.getFieldName()));
+
+            addPartition(record, result);

Review comment:
       `addPartitionKey` would be consistent with the property name.

##########
File path: nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/PutDynamoDBRecord.java
##########
@@ -0,0 +1,366 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.dynamodb;
+
+import com.amazonaws.AmazonClientException;
+import com.amazonaws.AmazonServiceException;
+import com.amazonaws.services.dynamodbv2.document.BatchWriteItemOutcome;
+import com.amazonaws.services.dynamodbv2.document.DynamoDB;
+import com.amazonaws.services.dynamodbv2.document.Item;
+import com.amazonaws.services.dynamodbv2.document.TableWriteItems;
+import com.amazonaws.services.dynamodbv2.model.ProvisionedThroughputExceededException;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
+import org.apache.nifi.annotation.behavior.SystemResource;
+import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
+import org.apache.nifi.annotation.behavior.SystemResourceConsiderations;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.proxy.ProxyConfigurationService;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.SplitRecordSetHandler;
+import org.apache.nifi.serialization.SplitRecordSetHandlerException;
+import org.apache.nifi.serialization.record.Record;
+
+import java.io.InputStream;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+@SeeAlso({DeleteDynamoDB.class, GetDynamoDB.class, PutDynamoDB.class})
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"Amazon", "DynamoDB", "AWS", "Put", "Insert", "Record"})
+@CapabilityDescription(
+        "Inserts items into DynamoDB based on record-oriented data. " +
+        "The record fields are mapped into DynamoDB item fields, including partition and sort keys if set. " +
+        "Depending on the number of records the processor might execute the insert in multiple chunks in order to overcome DynamoDB's limitation on batch writing. " +
+        "This might result partially processed FlowFiles in which case the FlowFile will be transferred to the \"unprocessed\" relationship " +
+        "with the necessary attribute to retry later without duplicating the already executed inserts."
+)
+@WritesAttributes({
+        @WritesAttribute(attribute = PutDynamoDBRecord.DYNAMODB_CHUNKS_PROCESSED_ATTRIBUTE, description = "Number of chunks successfully inserted into DynamoDB. If not set, it is considered as 0"),
+        @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_KEY_ERROR_UNPROCESSED, description = "DynamoDB unprocessed keys"),
+        @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_RANGE_KEY_VALUE_ERROR, description = "DynamoDB range key error"),
+        @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_KEY_ERROR_NOT_FOUND, description = "DynamoDB key not found"),
+        @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_EXCEPTION_MESSAGE, description = "DynamoDB exception message"),
+        @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_CODE, description = "DynamoDB error code"),
+        @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_MESSAGE, description = "DynamoDB error message"),
+        @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_TYPE, description = "DynamoDB error type"),
+        @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_SERVICE, description = "DynamoDB error service"),
+        @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_RETRYABLE, description = "DynamoDB error is retryable"),
+        @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_REQUEST_ID, description = "DynamoDB error request id"),
+        @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_STATUS_CODE, description = "DynamoDB error status code"),
+        @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ITEM_IO_ERROR, description = "IO exception message on creating item")
+})
+@ReadsAttribute(attribute = PutDynamoDBRecord.DYNAMODB_CHUNKS_PROCESSED_ATTRIBUTE, description = "Number of chunks successfully inserted into DynamoDB. If not set, it is considered as 0")
+@SystemResourceConsiderations({
+        @SystemResourceConsideration(resource = SystemResource.MEMORY),
+        @SystemResourceConsideration(resource = SystemResource.NETWORK)
+})
+public class PutDynamoDBRecord extends AbstractDynamoDBProcessor {
+
+    /**
+     * Due to DynamoDB's hardcoded limitation on the number of items in one batch, the processor writes them in chunks.
+     * Every chunk contains a number of items according to the limitations.
+     */
+    private static final int MAXIMUM_CHUNK_SIZE = 25;
+
+    static final String DYNAMODB_CHUNKS_PROCESSED_ATTRIBUTE = "dynamodb.chunks.processed";
+
+    static final AllowableValue PARTITION_BY_FIELD = new AllowableValue("ByField", "Partition By Field",
+            "Uses the value of the Record field identified by the \"Partition Field\" property as partition key value.");
+    static final AllowableValue PARTITION_BY_ATTRIBUTE = new AllowableValue("ByAttribute", "Partition By Attribute",
+            "Uses an incoming FlowFile attribute identified by \"Partition Attribute\" as the value of the partition key. " +
+            "The incoming Records must not contain field with the same name defined by the \"Partition Key Field\".");
+    static final AllowableValue PARTITION_GENERATED = new AllowableValue("Generated", "Generated UUID",
+            "Uses a generated UUID as value for the partition key. The incoming Records must not contain field with the same name defined by the \"Partition Key Field\".");
+
+    static final AllowableValue SORT_NONE = new AllowableValue("None", "None",
+            "The processor will not assign sort key to the inserted Items.");
+    static final AllowableValue SORT_BY_FIELD = new AllowableValue("ByField", "Sort By Field",
+            "With this strategy, the processor will use the value of the field identified by \"Sort Key Field\" as sort key value.");
+    static final AllowableValue SORT_BY_SEQUENCE = new AllowableValue("BySequence", "Generate Sequence",
+            "The processor will assign a number for every item based on the original record's position in the incoming FlowFile. This will be used as sort key value.");
+
+    static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
+            .name("record-reader")
+            .displayName("Record Reader")
+            .description("Specifies the Controller Service to use for parsing incoming data and determining the data's schema.")
+            .identifiesControllerService(RecordReaderFactory.class)
+            .required(true)
+            .build();
+
+    static final PropertyDescriptor PARTITION_KEY_STRATEGY = new PropertyDescriptor.Builder()
+            .name("partition-key-strategy")
+            .displayName("Partition Key Strategy")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .allowableValues(PARTITION_BY_FIELD, PARTITION_BY_ATTRIBUTE, PARTITION_GENERATED)
+            .defaultValue(PARTITION_BY_FIELD.getValue())
+            .description("Defines the strategy the processor uses to assign partition key value to the inserted Items.")
+            .build();
+
+    static final PropertyDescriptor PARTITION_KEY_FIELD = new PropertyDescriptor.Builder()
+            .name("partition-key-field")
+            .displayName("Partition Key Field")
+            .required(true)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .description(
+                    "Defines the name of the partition key field in the DynamoDB table. Partition key is also known as hash key. " +
+                    "Depending on the \"Partition Key Strategy\" the field value might come from the incoming Record or a generated one.")
+            .build();
+
+    static final PropertyDescriptor PARTITION_KEY_ATTRIBUTE = new PropertyDescriptor.Builder()
+            .name("partition-key-attribute")
+            .displayName("Partition Key Attribute")
+            .required(true)
+            .dependsOn(PARTITION_KEY_STRATEGY, PARTITION_BY_ATTRIBUTE)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .description("Specifies the FlowFile attribute that will be used as the value of the partition key when using \"Partition by attribute\" partition key strategy.")
+            .build();
+
+    static final PropertyDescriptor SORT_KEY_STRATEGY = new PropertyDescriptor.Builder()
+            .name("sort-key-strategy")
+            .displayName("Sort Key Strategy")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .allowableValues(SORT_NONE, SORT_BY_FIELD, SORT_BY_SEQUENCE)
+            .defaultValue(SORT_NONE.getValue())
+            .description("Defines the strategy the processor uses to assign sort key to the inserted Items.")
+            .build();
+
+    static final PropertyDescriptor SORT_KEY_FIELD = new PropertyDescriptor.Builder()
+            .name("sort-key-field")
+            .displayName("Sort Key Field")
+            .required(true)
+            .dependsOn(SORT_KEY_STRATEGY, SORT_BY_FIELD, SORT_BY_SEQUENCE)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .description("Defines the name of the sort key field in the DynamoDB table. Sort key is also known as range key.")
+            .build();
+
+    private static final List<PropertyDescriptor> PROPERTIES = Arrays.asList(
+            RECORD_READER,
+            new PropertyDescriptor.Builder().fromPropertyDescriptor(AWS_CREDENTIALS_PROVIDER_SERVICE).required(true).build(),
+            REGION,
+            TABLE,
+            PARTITION_KEY_STRATEGY,
+            PARTITION_KEY_FIELD,
+            PARTITION_KEY_ATTRIBUTE,
+            SORT_KEY_STRATEGY,
+            SORT_KEY_FIELD,
+            TIMEOUT,
+            ProxyConfigurationService.PROXY_CONFIGURATION_SERVICE,
+            SSL_CONTEXT_SERVICE
+    );
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTIES;
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+        final FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        final int alreadyProcessedChunks = flowFile.getAttribute(DYNAMODB_CHUNKS_PROCESSED_ATTRIBUTE) != null ? Integer.valueOf(flowFile.getAttribute(DYNAMODB_CHUNKS_PROCESSED_ATTRIBUTE)) : 0;
+        final RecordReaderFactory recordParserFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+        final SplitRecordSetHandler handler = new DynamoDbSplitRecordSetHandler(MAXIMUM_CHUNK_SIZE, getDynamoDB(), context, flowFile.getAttributes(), getLogger());
+        final SplitRecordSetHandler.RecordHandlerResult result;
+
+        try (
+            final InputStream in = session.read(flowFile);
+            final RecordReader reader = recordParserFactory.createRecordReader(flowFile, in, getLogger())) {
+            result = handler.handle(reader.createRecordSet(), alreadyProcessedChunks);
+        } catch (final Exception e) {
+            getLogger().error("Error while reading records: " + e.getMessage(), e);
+            session.transfer(flowFile, REL_FAILURE);
+            return;
+        }
+
+        final Map<String, String> attributes = new HashMap<>(flowFile.getAttributes());
+        attributes.put(DYNAMODB_CHUNKS_PROCESSED_ATTRIBUTE, String.valueOf(result.getSuccessfulChunks()));
+        final FlowFile outgoingFlowFile = session.putAllAttributes(flowFile, attributes);
+
+        if (result.isSuccess()) {
+            session.transfer(outgoingFlowFile, REL_SUCCESS);
+        } else if (result.getThrowable().getCause() != null && result.getThrowable().getCause() instanceof ProvisionedThroughputExceededException) {
+            /**
+             * When DynamoDB returns with {@code ProvisionedThroughputExceededException}, the client reached it's write limitation and
+             * should be retried at a later time. We yield the processor and the the FlowFile is considered unprocessed (partially processed) due to temporary write limitations.
+             * More about throughput limitations: https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/HowItWorks.ReadWriteCapacityMode.html
+             */
+            context.yield();
+            session.transfer(outgoingFlowFile, REL_UNPROCESSED);
+        } else if (result.getThrowable().getCause() != null && result.getThrowable().getCause() instanceof AmazonClientException) {
+            getLogger().error("Could not process FlowFile due to client exception: " + result.getThrowable().getMessage(), result.getThrowable());
+            session.transfer(processClientException(session, Collections.singletonList(outgoingFlowFile), (AmazonClientException) result.getThrowable().getCause()), REL_FAILURE);
+        } else if (result.getThrowable().getCause() != null && result.getThrowable().getCause() instanceof AmazonServiceException) {
+            getLogger().error("Could not process FlowFile due to server exception: " + result.getThrowable().getMessage(), result.getThrowable());
+            session.transfer(processServiceException(session, Collections.singletonList(outgoingFlowFile), (AmazonServiceException) result.getThrowable().getCause()), REL_FAILURE);
+        } else {
+            getLogger().error("Could not process FlowFile: " + result.getThrowable().getMessage(), result.getThrowable());
+            session.transfer(outgoingFlowFile, REL_FAILURE);
+        }
+    }
+
+    private static class DynamoDbSplitRecordSetHandler extends SplitRecordSetHandler {
+        private final DynamoDB dynamoDB;
+        private final String tableName;
+        private final ProcessContext context;
+        private final Map<String, String> flowFileAttributes;
+        private final ComponentLog logger;
+        private TableWriteItems accumulator;
+        private int itemCounter = 0;
+
+        private DynamoDbSplitRecordSetHandler(
+                final int maxChunkSize,
+                final DynamoDB dynamoDB,
+                final ProcessContext context,
+                final Map<String, String> flowFileAttributes,
+                final ComponentLog logger) {
+            super(maxChunkSize);
+            this.dynamoDB = dynamoDB;
+            this.context = context;
+            this.flowFileAttributes = flowFileAttributes;
+            this.logger = logger;
+            this.tableName = context.getProperty(TABLE).evaluateAttributeExpressions().getValue();
+            accumulator = new TableWriteItems(tableName);
+        }
+
+        @Override
+        protected void handleChunk(final boolean wasBatchAlreadyProcessed) throws SplitRecordSetHandlerException {
+            try {
+                if (!wasBatchAlreadyProcessed) {
+                    final BatchWriteItemOutcome outcome = dynamoDB.batchWriteItem(accumulator);
+
+                    if (!outcome.getUnprocessedItems().isEmpty()) {
+                        logger.error("Could not insert all items. The unprocessed items are: " + outcome.getUnprocessedItems().toString());
+                        throw new SplitRecordSetHandlerException("Could not insert all items");
+                    }
+                } else {
+                    logger.debug("Skipping chunk as was already processed");
+                }
+
+                accumulator = new TableWriteItems(tableName);
+            } catch (final Exception e) {
+                throw new SplitRecordSetHandlerException(e);
+            }
+        }
+
+        @Override
+        protected void addToChunk(final Record record) {
+            itemCounter++;
+            accumulator.addItemToPut(convert(record));
+        }
+
+        private Item convert(final Record record) {
+            final String partitionField  = context.getProperty(PARTITION_KEY_FIELD).evaluateAttributeExpressions().getValue();
+            final String sortKeyStrategy  = context.getProperty(SORT_KEY_STRATEGY).getValue();
+            final String sortKeyField  = context.getProperty(SORT_KEY_FIELD).evaluateAttributeExpressions().getValue();
+
+            final Item result = new Item();
+
+            record.getSchema()
+                    .getFields()
+                    .stream()
+                    .filter(field -> !field.getFieldName().equals(partitionField))
+                    .filter(field -> SORT_NONE.getValue().equals(sortKeyStrategy) || !field.getFieldName().equals(sortKeyField))
+                    .forEach(field -> RecordToItemConverter.addField(record, result, field.getDataType().getFieldType(), field.getFieldName()));
+
+            addPartition(record, result);
+            addSortKey(record, result);
+            return result;
+        }
+
+        private void addPartition(final Record record, final Item result) {
+            final String partitionStrategy = context.getProperty(PARTITION_KEY_STRATEGY).getValue();
+            final String partitionField  = context.getProperty(PARTITION_KEY_FIELD).evaluateAttributeExpressions().getValue();
+            final String partitionAttribute = context.getProperty(PARTITION_KEY_ATTRIBUTE).evaluateAttributeExpressions().getValue();
+
+            if (PARTITION_BY_FIELD.getValue().equals(partitionStrategy)) {
+                if (!record.getSchema().getFieldNames().contains(partitionField)) {
+                    throw new ProcessException("\"By field partition\" strategy needs the \"Partition Field\" to present in the record");

Review comment:
       Wrong property names: `By field partition` => `Partition By Field`, `Partition Field` = > `Partition Key Field`
   You can use the `displayName` of the `PropertyDescriptor` / `AllowableValue` with `String.format()`.
   Please also check the other error messages below.

##########
File path: nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/PutDynamoDBRecord.java
##########
@@ -0,0 +1,366 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.dynamodb;
+
+import com.amazonaws.AmazonClientException;
+import com.amazonaws.AmazonServiceException;
+import com.amazonaws.services.dynamodbv2.document.BatchWriteItemOutcome;
+import com.amazonaws.services.dynamodbv2.document.DynamoDB;
+import com.amazonaws.services.dynamodbv2.document.Item;
+import com.amazonaws.services.dynamodbv2.document.TableWriteItems;
+import com.amazonaws.services.dynamodbv2.model.ProvisionedThroughputExceededException;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
+import org.apache.nifi.annotation.behavior.SystemResource;
+import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
+import org.apache.nifi.annotation.behavior.SystemResourceConsiderations;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.proxy.ProxyConfigurationService;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.SplitRecordSetHandler;
+import org.apache.nifi.serialization.SplitRecordSetHandlerException;
+import org.apache.nifi.serialization.record.Record;
+
+import java.io.InputStream;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+@SeeAlso({DeleteDynamoDB.class, GetDynamoDB.class, PutDynamoDB.class})
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"Amazon", "DynamoDB", "AWS", "Put", "Insert", "Record"})
+@CapabilityDescription(
+        "Inserts items into DynamoDB based on record-oriented data. " +
+        "The record fields are mapped into DynamoDB item fields, including partition and sort keys if set. " +
+        "Depending on the number of records the processor might execute the insert in multiple chunks in order to overcome DynamoDB's limitation on batch writing. " +
+        "This might result partially processed FlowFiles in which case the FlowFile will be transferred to the \"unprocessed\" relationship " +
+        "with the necessary attribute to retry later without duplicating the already executed inserts."
+)
+@WritesAttributes({
+        @WritesAttribute(attribute = PutDynamoDBRecord.DYNAMODB_CHUNKS_PROCESSED_ATTRIBUTE, description = "Number of chunks successfully inserted into DynamoDB. If not set, it is considered as 0"),
+        @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_KEY_ERROR_UNPROCESSED, description = "DynamoDB unprocessed keys"),
+        @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_RANGE_KEY_VALUE_ERROR, description = "DynamoDB range key error"),
+        @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_KEY_ERROR_NOT_FOUND, description = "DynamoDB key not found"),
+        @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_EXCEPTION_MESSAGE, description = "DynamoDB exception message"),
+        @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_CODE, description = "DynamoDB error code"),
+        @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_MESSAGE, description = "DynamoDB error message"),
+        @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_TYPE, description = "DynamoDB error type"),
+        @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_SERVICE, description = "DynamoDB error service"),
+        @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_RETRYABLE, description = "DynamoDB error is retryable"),
+        @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_REQUEST_ID, description = "DynamoDB error request id"),
+        @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ERROR_STATUS_CODE, description = "DynamoDB error status code"),
+        @WritesAttribute(attribute = AbstractDynamoDBProcessor.DYNAMODB_ITEM_IO_ERROR, description = "IO exception message on creating item")
+})
+@ReadsAttribute(attribute = PutDynamoDBRecord.DYNAMODB_CHUNKS_PROCESSED_ATTRIBUTE, description = "Number of chunks successfully inserted into DynamoDB. If not set, it is considered as 0")
+@SystemResourceConsiderations({
+        @SystemResourceConsideration(resource = SystemResource.MEMORY),
+        @SystemResourceConsideration(resource = SystemResource.NETWORK)
+})
+public class PutDynamoDBRecord extends AbstractDynamoDBProcessor {
+
+    /**
+     * Due to DynamoDB's hardcoded limitation on the number of items in one batch, the processor writes them in chunks.
+     * Every chunk contains a number of items according to the limitations.
+     */
+    private static final int MAXIMUM_CHUNK_SIZE = 25;
+
+    static final String DYNAMODB_CHUNKS_PROCESSED_ATTRIBUTE = "dynamodb.chunks.processed";
+
+    static final AllowableValue PARTITION_BY_FIELD = new AllowableValue("ByField", "Partition By Field",
+            "Uses the value of the Record field identified by the \"Partition Field\" property as partition key value.");
+    static final AllowableValue PARTITION_BY_ATTRIBUTE = new AllowableValue("ByAttribute", "Partition By Attribute",
+            "Uses an incoming FlowFile attribute identified by \"Partition Attribute\" as the value of the partition key. " +
+            "The incoming Records must not contain field with the same name defined by the \"Partition Key Field\".");
+    static final AllowableValue PARTITION_GENERATED = new AllowableValue("Generated", "Generated UUID",
+            "Uses a generated UUID as value for the partition key. The incoming Records must not contain field with the same name defined by the \"Partition Key Field\".");
+
+    static final AllowableValue SORT_NONE = new AllowableValue("None", "None",
+            "The processor will not assign sort key to the inserted Items.");
+    static final AllowableValue SORT_BY_FIELD = new AllowableValue("ByField", "Sort By Field",
+            "With this strategy, the processor will use the value of the field identified by \"Sort Key Field\" as sort key value.");
+    static final AllowableValue SORT_BY_SEQUENCE = new AllowableValue("BySequence", "Generate Sequence",
+            "The processor will assign a number for every item based on the original record's position in the incoming FlowFile. This will be used as sort key value.");
+
+    static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
+            .name("record-reader")
+            .displayName("Record Reader")
+            .description("Specifies the Controller Service to use for parsing incoming data and determining the data's schema.")
+            .identifiesControllerService(RecordReaderFactory.class)
+            .required(true)
+            .build();
+
+    static final PropertyDescriptor PARTITION_KEY_STRATEGY = new PropertyDescriptor.Builder()
+            .name("partition-key-strategy")
+            .displayName("Partition Key Strategy")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .allowableValues(PARTITION_BY_FIELD, PARTITION_BY_ATTRIBUTE, PARTITION_GENERATED)
+            .defaultValue(PARTITION_BY_FIELD.getValue())
+            .description("Defines the strategy the processor uses to assign partition key value to the inserted Items.")
+            .build();
+
+    static final PropertyDescriptor PARTITION_KEY_FIELD = new PropertyDescriptor.Builder()
+            .name("partition-key-field")
+            .displayName("Partition Key Field")
+            .required(true)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .description(
+                    "Defines the name of the partition key field in the DynamoDB table. Partition key is also known as hash key. " +
+                    "Depending on the \"Partition Key Strategy\" the field value might come from the incoming Record or a generated one.")
+            .build();
+
+    static final PropertyDescriptor PARTITION_KEY_ATTRIBUTE = new PropertyDescriptor.Builder()
+            .name("partition-key-attribute")
+            .displayName("Partition Key Attribute")
+            .required(true)
+            .dependsOn(PARTITION_KEY_STRATEGY, PARTITION_BY_ATTRIBUTE)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .description("Specifies the FlowFile attribute that will be used as the value of the partition key when using \"Partition by attribute\" partition key strategy.")
+            .build();
+
+    static final PropertyDescriptor SORT_KEY_STRATEGY = new PropertyDescriptor.Builder()
+            .name("sort-key-strategy")
+            .displayName("Sort Key Strategy")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .allowableValues(SORT_NONE, SORT_BY_FIELD, SORT_BY_SEQUENCE)
+            .defaultValue(SORT_NONE.getValue())
+            .description("Defines the strategy the processor uses to assign sort key to the inserted Items.")
+            .build();
+
+    static final PropertyDescriptor SORT_KEY_FIELD = new PropertyDescriptor.Builder()
+            .name("sort-key-field")
+            .displayName("Sort Key Field")
+            .required(true)
+            .dependsOn(SORT_KEY_STRATEGY, SORT_BY_FIELD, SORT_BY_SEQUENCE)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .description("Defines the name of the sort key field in the DynamoDB table. Sort key is also known as range key.")
+            .build();
+
+    private static final List<PropertyDescriptor> PROPERTIES = Arrays.asList(
+            RECORD_READER,
+            new PropertyDescriptor.Builder().fromPropertyDescriptor(AWS_CREDENTIALS_PROVIDER_SERVICE).required(true).build(),
+            REGION,
+            TABLE,
+            PARTITION_KEY_STRATEGY,
+            PARTITION_KEY_FIELD,
+            PARTITION_KEY_ATTRIBUTE,
+            SORT_KEY_STRATEGY,
+            SORT_KEY_FIELD,
+            TIMEOUT,
+            ProxyConfigurationService.PROXY_CONFIGURATION_SERVICE,
+            SSL_CONTEXT_SERVICE
+    );
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTIES;
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+        final FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        final int alreadyProcessedChunks = flowFile.getAttribute(DYNAMODB_CHUNKS_PROCESSED_ATTRIBUTE) != null ? Integer.valueOf(flowFile.getAttribute(DYNAMODB_CHUNKS_PROCESSED_ATTRIBUTE)) : 0;
+        final RecordReaderFactory recordParserFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+        final SplitRecordSetHandler handler = new DynamoDbSplitRecordSetHandler(MAXIMUM_CHUNK_SIZE, getDynamoDB(), context, flowFile.getAttributes(), getLogger());
+        final SplitRecordSetHandler.RecordHandlerResult result;
+
+        try (
+            final InputStream in = session.read(flowFile);
+            final RecordReader reader = recordParserFactory.createRecordReader(flowFile, in, getLogger())) {
+            result = handler.handle(reader.createRecordSet(), alreadyProcessedChunks);
+        } catch (final Exception e) {
+            getLogger().error("Error while reading records: " + e.getMessage(), e);
+            session.transfer(flowFile, REL_FAILURE);
+            return;
+        }
+
+        final Map<String, String> attributes = new HashMap<>(flowFile.getAttributes());
+        attributes.put(DYNAMODB_CHUNKS_PROCESSED_ATTRIBUTE, String.valueOf(result.getSuccessfulChunks()));
+        final FlowFile outgoingFlowFile = session.putAllAttributes(flowFile, attributes);
+
+        if (result.isSuccess()) {
+            session.transfer(outgoingFlowFile, REL_SUCCESS);
+        } else if (result.getThrowable().getCause() != null && result.getThrowable().getCause() instanceof ProvisionedThroughputExceededException) {
+            /**
+             * When DynamoDB returns with {@code ProvisionedThroughputExceededException}, the client reached it's write limitation and
+             * should be retried at a later time. We yield the processor and the the FlowFile is considered unprocessed (partially processed) due to temporary write limitations.
+             * More about throughput limitations: https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/HowItWorks.ReadWriteCapacityMode.html
+             */
+            context.yield();
+            session.transfer(outgoingFlowFile, REL_UNPROCESSED);
+        } else if (result.getThrowable().getCause() != null && result.getThrowable().getCause() instanceof AmazonClientException) {
+            getLogger().error("Could not process FlowFile due to client exception: " + result.getThrowable().getMessage(), result.getThrowable());
+            session.transfer(processClientException(session, Collections.singletonList(outgoingFlowFile), (AmazonClientException) result.getThrowable().getCause()), REL_FAILURE);
+        } else if (result.getThrowable().getCause() != null && result.getThrowable().getCause() instanceof AmazonServiceException) {
+            getLogger().error("Could not process FlowFile due to server exception: " + result.getThrowable().getMessage(), result.getThrowable());
+            session.transfer(processServiceException(session, Collections.singletonList(outgoingFlowFile), (AmazonServiceException) result.getThrowable().getCause()), REL_FAILURE);
+        } else {
+            getLogger().error("Could not process FlowFile: " + result.getThrowable().getMessage(), result.getThrowable());
+            session.transfer(outgoingFlowFile, REL_FAILURE);
+        }
+    }
+
+    private static class DynamoDbSplitRecordSetHandler extends SplitRecordSetHandler {
+        private final DynamoDB dynamoDB;
+        private final String tableName;
+        private final ProcessContext context;
+        private final Map<String, String> flowFileAttributes;
+        private final ComponentLog logger;
+        private TableWriteItems accumulator;
+        private int itemCounter = 0;
+
+        private DynamoDbSplitRecordSetHandler(
+                final int maxChunkSize,
+                final DynamoDB dynamoDB,
+                final ProcessContext context,
+                final Map<String, String> flowFileAttributes,
+                final ComponentLog logger) {
+            super(maxChunkSize);
+            this.dynamoDB = dynamoDB;
+            this.context = context;
+            this.flowFileAttributes = flowFileAttributes;
+            this.logger = logger;
+            this.tableName = context.getProperty(TABLE).evaluateAttributeExpressions().getValue();
+            accumulator = new TableWriteItems(tableName);
+        }
+
+        @Override
+        protected void handleChunk(final boolean wasBatchAlreadyProcessed) throws SplitRecordSetHandlerException {
+            try {
+                if (!wasBatchAlreadyProcessed) {
+                    final BatchWriteItemOutcome outcome = dynamoDB.batchWriteItem(accumulator);
+
+                    if (!outcome.getUnprocessedItems().isEmpty()) {
+                        logger.error("Could not insert all items. The unprocessed items are: " + outcome.getUnprocessedItems().toString());
+                        throw new SplitRecordSetHandlerException("Could not insert all items");
+                    }
+                } else {
+                    logger.debug("Skipping chunk as was already processed");
+                }
+
+                accumulator = new TableWriteItems(tableName);
+            } catch (final Exception e) {
+                throw new SplitRecordSetHandlerException(e);
+            }
+        }
+
+        @Override
+        protected void addToChunk(final Record record) {
+            itemCounter++;
+            accumulator.addItemToPut(convert(record));
+        }
+
+        private Item convert(final Record record) {
+            final String partitionField  = context.getProperty(PARTITION_KEY_FIELD).evaluateAttributeExpressions().getValue();
+            final String sortKeyStrategy  = context.getProperty(SORT_KEY_STRATEGY).getValue();
+            final String sortKeyField  = context.getProperty(SORT_KEY_FIELD).evaluateAttributeExpressions().getValue();
+
+            final Item result = new Item();
+
+            record.getSchema()
+                    .getFields()
+                    .stream()
+                    .filter(field -> !field.getFieldName().equals(partitionField))
+                    .filter(field -> SORT_NONE.getValue().equals(sortKeyStrategy) || !field.getFieldName().equals(sortKeyField))
+                    .forEach(field -> RecordToItemConverter.addField(record, result, field.getDataType().getFieldType(), field.getFieldName()));
+
+            addPartition(record, result);
+            addSortKey(record, result);
+            return result;
+        }
+
+        private void addPartition(final Record record, final Item result) {
+            final String partitionStrategy = context.getProperty(PARTITION_KEY_STRATEGY).getValue();
+            final String partitionField  = context.getProperty(PARTITION_KEY_FIELD).evaluateAttributeExpressions().getValue();
+            final String partitionAttribute = context.getProperty(PARTITION_KEY_ATTRIBUTE).evaluateAttributeExpressions().getValue();

Review comment:
       `partitionKey*` would be consistent with the property names.

##########
File path: nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/docs/org.apache.nifi.processors.aws.dynamodb.PutDynamoDBRecord/additionalDetails.html
##########
@@ -0,0 +1,258 @@
+<!DOCTYPE html>
+<html lang="en">
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+      http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<head>
+    <meta charset="utf-8" />
+    <title>PutDynamoDBRecord</title>
+    <link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css" />
+</head>
+<body>
+    <h2>Description</h2>
+
+    <p>
+        <i>PutDynamoDBRecord</i> intends to provide the capability to insert multiple Items into a DynamoDB table from a record-oriented FlowFile.
+        Compared to the <i>PutDynamoDB</i>, this processor is capable to process data based other than JSON format too and prepared to add multiple fields for a given Item.
+        Also, <i>PutDynamoDBRecord</i> is designed to insert bigger batches of data into the database.
+    </p>
+
+    <h2>Data types</h2>
+
+    <p>
+        The list data types supported by DynamoDB does not fully overlaps with the capabilities of the Record data structure.
+        Some conversions and simplifications are necessary during inserting the data. These are:
+    </p>
+
+    <ul>
+        <li>Numeric values are stored using a floating-point data structure within Items. In some cases this representation might cause issues with the accuracy.</li>
+        <li>Char is not a supported type within DynamoDB, these fields are converted into String values.</li>
+        <li>Enum types are stored as String fields, using the name of the given enum.</li>
+        <li>DynamoDB stores time and date related information as Strings.</li>
+        <li>Internal record structures are converted into maps.</li>
+        <li>Choice is not a supported data type, regardless of the actual wrapped data type, values enveloped in Choice are handled as Strings.</li>
+        <li>Unknown data types are handles as stings.</li>
+    </ul>
+
+    <h2>Limitations</h2>
+
+    <p>
+        Working with DynamoDB when batch inserting comes with two inherit limitations. First, the number of inserted Items is limited to 25 in any case.
+        In order to overcome this, during one execution, depending on the number or records in the incoming FlowFile, <i>PutDynamoDBRecord</i> might attempt multiple
+        insert calls towards the database server. Using this approach, the flow does not have to work with this limitation in most cases.
+    </p>
+
+    <p>
+        Having multiple external actions comes with the risk of having an unforeseen result at one of the steps.
+        For example when the incoming FlowFile is consists of 70 records, it will be split into 3 chunks, with a single insert operation for every chunk.
+        The first two chunks contains 25 Items to insert per chunk, and the third contains the remaining 20. In some cases it might occur that the first two insert operation succeeds but the third one fails.
+        In these cases we consider the FlowFile "partially processed" and we will transfer it to the "failure" or "unprocessed" Relationship according to the nature of the issue.
+        In order to keep the information about the successfully processed chunks the processor assigns the <i>"dynamodb.chunks.processed"</i> attribute to the FlowFile, which has the number of successfully processed chunks as value.
+    </p>
+
+    <p>
+        The most common reason for this behaviour comes from the other limitation the inserts have with DynamoDB: the database has a build in supervision over the amount of inserted data.
+        When a client reaches the "throughput limit", the server refuses to process the insert request until a certain amount of time. More information on this might be find <a href="https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/HowItWorks.ReadWriteCapacityMode.html">here</a>.
+        From the perspective of the <i>PutDynamoDBRecord</i> we consider these cases as temporary issues and the FlowFile will be transferred to the "unprocessed" Relationship after which the processor will yield in order to avoid further throughput issues.
+        (Other kinds of failures will result transfer to the "failure" Relationship)
+    </p>
+
+    <h2>Retry</h2>
+
+    <p>
+        It is suggested to loop back the "unprocessed" Relationship to the <i>PutDynamoDBRecord</i> in some way. FlowFiles transferred to that relationship considered as healthy ones might be successfully processed in a later point.
+        It is possible that the FlowFile contains such a high number of records, what needs more than two attempts to fully insert.
+        The attribute "dynamodb.chunks.processed" is "rolled" through the attempts, which means, after each trigger it will contain the sum number of inserted chunks making it possible for the later attempts to continue from the right point without duplicated inserts.
+    </p>
+
+    <h2>Partition and sort keys</h2>
+
+    <p>
+        The processor supports multiple strategies for assigning partition key and sort key to the inserted Items. These are:
+    </p>
+
+    <h3>Partition Key Strategies</h3>
+
+    <h4>Partition By Field</h4>
+
+    <p>
+        The processors assigns one of the record field as partition key. The name of the record field is specified by the "Partition Key Field" attribute and the value will be the value of the record field with the same name.
+    </p>
+
+    <h4>Partition By Attribute</h4>
+
+    <p>
+        The processor assigns the value of a FlowFile attribute as partition key. With this strategy all the Items within a FlowFile will share the same partition case and it is suggested to use a sort key in order to meet the primary key requirements of the DynamoDB.

Review comment:
       Typo (?): `partition case` => `partition key`

##########
File path: nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/docs/org.apache.nifi.processors.aws.dynamodb.PutDynamoDBRecord/additionalDetails.html
##########
@@ -0,0 +1,258 @@
+<!DOCTYPE html>
+<html lang="en">
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+      http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<head>
+    <meta charset="utf-8" />
+    <title>PutDynamoDBRecord</title>
+    <link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css" />
+</head>
+<body>
+    <h2>Description</h2>
+
+    <p>
+        <i>PutDynamoDBRecord</i> intends to provide the capability to insert multiple Items into a DynamoDB table from a record-oriented FlowFile.
+        Compared to the <i>PutDynamoDB</i>, this processor is capable to process data based other than JSON format too and prepared to add multiple fields for a given Item.
+        Also, <i>PutDynamoDBRecord</i> is designed to insert bigger batches of data into the database.
+    </p>
+
+    <h2>Data types</h2>
+
+    <p>
+        The list data types supported by DynamoDB does not fully overlaps with the capabilities of the Record data structure.
+        Some conversions and simplifications are necessary during inserting the data. These are:
+    </p>
+
+    <ul>
+        <li>Numeric values are stored using a floating-point data structure within Items. In some cases this representation might cause issues with the accuracy.</li>
+        <li>Char is not a supported type within DynamoDB, these fields are converted into String values.</li>
+        <li>Enum types are stored as String fields, using the name of the given enum.</li>
+        <li>DynamoDB stores time and date related information as Strings.</li>
+        <li>Internal record structures are converted into maps.</li>
+        <li>Choice is not a supported data type, regardless of the actual wrapped data type, values enveloped in Choice are handled as Strings.</li>
+        <li>Unknown data types are handles as stings.</li>
+    </ul>
+
+    <h2>Limitations</h2>
+
+    <p>
+        Working with DynamoDB when batch inserting comes with two inherit limitations. First, the number of inserted Items is limited to 25 in any case.
+        In order to overcome this, during one execution, depending on the number or records in the incoming FlowFile, <i>PutDynamoDBRecord</i> might attempt multiple
+        insert calls towards the database server. Using this approach, the flow does not have to work with this limitation in most cases.
+    </p>
+
+    <p>
+        Having multiple external actions comes with the risk of having an unforeseen result at one of the steps.
+        For example when the incoming FlowFile is consists of 70 records, it will be split into 3 chunks, with a single insert operation for every chunk.
+        The first two chunks contains 25 Items to insert per chunk, and the third contains the remaining 20. In some cases it might occur that the first two insert operation succeeds but the third one fails.
+        In these cases we consider the FlowFile "partially processed" and we will transfer it to the "failure" or "unprocessed" Relationship according to the nature of the issue.
+        In order to keep the information about the successfully processed chunks the processor assigns the <i>"dynamodb.chunks.processed"</i> attribute to the FlowFile, which has the number of successfully processed chunks as value.
+    </p>
+
+    <p>
+        The most common reason for this behaviour comes from the other limitation the inserts have with DynamoDB: the database has a build in supervision over the amount of inserted data.
+        When a client reaches the "throughput limit", the server refuses to process the insert request until a certain amount of time. More information on this might be find <a href="https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/HowItWorks.ReadWriteCapacityMode.html">here</a>.
+        From the perspective of the <i>PutDynamoDBRecord</i> we consider these cases as temporary issues and the FlowFile will be transferred to the "unprocessed" Relationship after which the processor will yield in order to avoid further throughput issues.
+        (Other kinds of failures will result transfer to the "failure" Relationship)
+    </p>
+
+    <h2>Retry</h2>
+
+    <p>
+        It is suggested to loop back the "unprocessed" Relationship to the <i>PutDynamoDBRecord</i> in some way. FlowFiles transferred to that relationship considered as healthy ones might be successfully processed in a later point.
+        It is possible that the FlowFile contains such a high number of records, what needs more than two attempts to fully insert.
+        The attribute "dynamodb.chunks.processed" is "rolled" through the attempts, which means, after each trigger it will contain the sum number of inserted chunks making it possible for the later attempts to continue from the right point without duplicated inserts.
+    </p>
+
+    <h2>Partition and sort keys</h2>
+
+    <p>
+        The processor supports multiple strategies for assigning partition key and sort key to the inserted Items. These are:
+    </p>
+
+    <h3>Partition Key Strategies</h3>
+
+    <h4>Partition By Field</h4>
+
+    <p>
+        The processors assigns one of the record field as partition key. The name of the record field is specified by the "Partition Key Field" attribute and the value will be the value of the record field with the same name.
+    </p>
+
+    <h4>Partition By Attribute</h4>
+
+    <p>
+        The processor assigns the value of a FlowFile attribute as partition key. With this strategy all the Items within a FlowFile will share the same partition case and it is suggested to use a sort key in order to meet the primary key requirements of the DynamoDB.
+        The property "Partition Key Field" defines the name of the Item field and the property "Partition Key Attribute" will specify which attribute's value will be assigned to the partition key.
+        With this strategy the "Partition Key Field" must be different from the fields consisted by the incoming records.
+    </p>
+
+    <h4>Generated UUID</h4>
+
+    <p>
+        By using this strategy the processor will generate a UUID identifier for every single Item. This identifier will be used as value for the partition key.
+        The name of the field used as partition key is defined by the property "Partition Key Field".
+        With this strategy the "Partition Key Field" must be different from the fields consisted by the incoming records.
+    </p>
+
+    <h3>Sort Key Strategies</h3>
+
+    <h4>None</h4>
+
+    <p>
+        No sort key will be assigned to the Item. In case of the table definition expects it, using this strategy will result unsuccessful inserts.
+    </p>
+
+    <h4>Sort By Field</h4>
+
+    <p>
+        The processors assigns one of the record field as sort key. The name of the record field is specified by the "Sort Key Field" attribute and the value will be the value of the record field with the same name.
+        With this strategy the "Sort Key Field" must be different from the fields consisted by the incoming records.
+    </p>
+
+    <h4>Generate Sequence</h4>
+
+    <p>
+        The processor assigns a generated value to every Item based on the original record's position in the incoming FlowFile (regardless of the chunks).
+        The first Item will have the sort key 1, the second will have sort key 2 and so on. The generated keys are unique within a given FlowFile.
+        The name of the record field is specified by the "Sort Key Field" attribute.
+        With this strategy the "Sort Key Field" must be different from the fields consisted by the incoming records.
+
+    </p>
+
+    <h2>Examples</h2>
+
+    <h3>Using fields as partition and sort key</h3>
+
+    <h4>Setup</h4>
+
+    <ul>
+        <li>Partition Key Strategy: Partition by field</li>

Review comment:
       Please use the same labels as on the UI: Partition By Field

##########
File path: nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/docs/org.apache.nifi.processors.aws.dynamodb.PutDynamoDBRecord/additionalDetails.html
##########
@@ -0,0 +1,258 @@
+<!DOCTYPE html>
+<html lang="en">
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+      http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<head>
+    <meta charset="utf-8" />
+    <title>PutDynamoDBRecord</title>
+    <link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css" />
+</head>
+<body>
+    <h2>Description</h2>
+
+    <p>
+        <i>PutDynamoDBRecord</i> intends to provide the capability to insert multiple Items into a DynamoDB table from a record-oriented FlowFile.
+        Compared to the <i>PutDynamoDB</i>, this processor is capable to process data based other than JSON format too and prepared to add multiple fields for a given Item.
+        Also, <i>PutDynamoDBRecord</i> is designed to insert bigger batches of data into the database.
+    </p>
+
+    <h2>Data types</h2>
+
+    <p>
+        The list data types supported by DynamoDB does not fully overlaps with the capabilities of the Record data structure.
+        Some conversions and simplifications are necessary during inserting the data. These are:
+    </p>
+
+    <ul>
+        <li>Numeric values are stored using a floating-point data structure within Items. In some cases this representation might cause issues with the accuracy.</li>
+        <li>Char is not a supported type within DynamoDB, these fields are converted into String values.</li>
+        <li>Enum types are stored as String fields, using the name of the given enum.</li>
+        <li>DynamoDB stores time and date related information as Strings.</li>
+        <li>Internal record structures are converted into maps.</li>
+        <li>Choice is not a supported data type, regardless of the actual wrapped data type, values enveloped in Choice are handled as Strings.</li>
+        <li>Unknown data types are handles as stings.</li>

Review comment:
       Typos: are handled as strings

##########
File path: nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/docs/org.apache.nifi.processors.aws.dynamodb.PutDynamoDBRecord/additionalDetails.html
##########
@@ -0,0 +1,258 @@
+<!DOCTYPE html>
+<html lang="en">
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+      http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<head>
+    <meta charset="utf-8" />
+    <title>PutDynamoDBRecord</title>
+    <link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css" />
+</head>
+<body>
+    <h2>Description</h2>
+
+    <p>
+        <i>PutDynamoDBRecord</i> intends to provide the capability to insert multiple Items into a DynamoDB table from a record-oriented FlowFile.
+        Compared to the <i>PutDynamoDB</i>, this processor is capable to process data based other than JSON format too and prepared to add multiple fields for a given Item.
+        Also, <i>PutDynamoDBRecord</i> is designed to insert bigger batches of data into the database.
+    </p>
+
+    <h2>Data types</h2>
+
+    <p>
+        The list data types supported by DynamoDB does not fully overlaps with the capabilities of the Record data structure.
+        Some conversions and simplifications are necessary during inserting the data. These are:
+    </p>
+
+    <ul>
+        <li>Numeric values are stored using a floating-point data structure within Items. In some cases this representation might cause issues with the accuracy.</li>
+        <li>Char is not a supported type within DynamoDB, these fields are converted into String values.</li>
+        <li>Enum types are stored as String fields, using the name of the given enum.</li>
+        <li>DynamoDB stores time and date related information as Strings.</li>
+        <li>Internal record structures are converted into maps.</li>
+        <li>Choice is not a supported data type, regardless of the actual wrapped data type, values enveloped in Choice are handled as Strings.</li>
+        <li>Unknown data types are handles as stings.</li>
+    </ul>
+
+    <h2>Limitations</h2>
+
+    <p>
+        Working with DynamoDB when batch inserting comes with two inherit limitations. First, the number of inserted Items is limited to 25 in any case.
+        In order to overcome this, during one execution, depending on the number or records in the incoming FlowFile, <i>PutDynamoDBRecord</i> might attempt multiple
+        insert calls towards the database server. Using this approach, the flow does not have to work with this limitation in most cases.
+    </p>
+
+    <p>
+        Having multiple external actions comes with the risk of having an unforeseen result at one of the steps.
+        For example when the incoming FlowFile is consists of 70 records, it will be split into 3 chunks, with a single insert operation for every chunk.
+        The first two chunks contains 25 Items to insert per chunk, and the third contains the remaining 20. In some cases it might occur that the first two insert operation succeeds but the third one fails.
+        In these cases we consider the FlowFile "partially processed" and we will transfer it to the "failure" or "unprocessed" Relationship according to the nature of the issue.
+        In order to keep the information about the successfully processed chunks the processor assigns the <i>"dynamodb.chunks.processed"</i> attribute to the FlowFile, which has the number of successfully processed chunks as value.
+    </p>
+
+    <p>
+        The most common reason for this behaviour comes from the other limitation the inserts have with DynamoDB: the database has a build in supervision over the amount of inserted data.
+        When a client reaches the "throughput limit", the server refuses to process the insert request until a certain amount of time. More information on this might be find <a href="https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/HowItWorks.ReadWriteCapacityMode.html">here</a>.
+        From the perspective of the <i>PutDynamoDBRecord</i> we consider these cases as temporary issues and the FlowFile will be transferred to the "unprocessed" Relationship after which the processor will yield in order to avoid further throughput issues.
+        (Other kinds of failures will result transfer to the "failure" Relationship)
+    </p>
+
+    <h2>Retry</h2>
+
+    <p>
+        It is suggested to loop back the "unprocessed" Relationship to the <i>PutDynamoDBRecord</i> in some way. FlowFiles transferred to that relationship considered as healthy ones might be successfully processed in a later point.
+        It is possible that the FlowFile contains such a high number of records, what needs more than two attempts to fully insert.
+        The attribute "dynamodb.chunks.processed" is "rolled" through the attempts, which means, after each trigger it will contain the sum number of inserted chunks making it possible for the later attempts to continue from the right point without duplicated inserts.
+    </p>
+
+    <h2>Partition and sort keys</h2>
+
+    <p>
+        The processor supports multiple strategies for assigning partition key and sort key to the inserted Items. These are:
+    </p>
+
+    <h3>Partition Key Strategies</h3>
+
+    <h4>Partition By Field</h4>
+
+    <p>
+        The processors assigns one of the record field as partition key. The name of the record field is specified by the "Partition Key Field" attribute and the value will be the value of the record field with the same name.
+    </p>
+
+    <h4>Partition By Attribute</h4>
+
+    <p>
+        The processor assigns the value of a FlowFile attribute as partition key. With this strategy all the Items within a FlowFile will share the same partition case and it is suggested to use a sort key in order to meet the primary key requirements of the DynamoDB.
+        The property "Partition Key Field" defines the name of the Item field and the property "Partition Key Attribute" will specify which attribute's value will be assigned to the partition key.
+        With this strategy the "Partition Key Field" must be different from the fields consisted by the incoming records.
+    </p>
+
+    <h4>Generated UUID</h4>
+
+    <p>
+        By using this strategy the processor will generate a UUID identifier for every single Item. This identifier will be used as value for the partition key.
+        The name of the field used as partition key is defined by the property "Partition Key Field".
+        With this strategy the "Partition Key Field" must be different from the fields consisted by the incoming records.
+    </p>
+
+    <h3>Sort Key Strategies</h3>
+
+    <h4>None</h4>
+
+    <p>
+        No sort key will be assigned to the Item. In case of the table definition expects it, using this strategy will result unsuccessful inserts.
+    </p>
+
+    <h4>Sort By Field</h4>
+
+    <p>
+        The processors assigns one of the record field as sort key. The name of the record field is specified by the "Sort Key Field" attribute and the value will be the value of the record field with the same name.
+        With this strategy the "Sort Key Field" must be different from the fields consisted by the incoming records.
+    </p>
+
+    <h4>Generate Sequence</h4>
+
+    <p>
+        The processor assigns a generated value to every Item based on the original record's position in the incoming FlowFile (regardless of the chunks).
+        The first Item will have the sort key 1, the second will have sort key 2 and so on. The generated keys are unique within a given FlowFile.
+        The name of the record field is specified by the "Sort Key Field" attribute.
+        With this strategy the "Sort Key Field" must be different from the fields consisted by the incoming records.
+
+    </p>
+
+    <h2>Examples</h2>
+
+    <h3>Using fields as partition and sort key</h3>
+
+    <h4>Setup</h4>
+
+    <ul>
+        <li>Partition Key Strategy: Partition by field</li>
+        <li>Partition Key Field: class</li>
+        <li>Sort Key Strategy: Sort By Field</li>
+        <li>Sort Key Field: size</li>
+    </ul>
+
+    <p>
+        Note: both fields have to exist in the incoming records!
+    </p>
+
+    <h4>Result</h4>
+
+    <p>
+        Using this pair of strategies will result Items identical to the incoming record (not counting the representational changes from the conversion).
+        The field specified by the properties are added to the Items normally with the only difference of flagged as (primary) key items.
+    </p>
+
+    <h4>Input</h4>
+
+    <code>
+[{"type": "A", "subtype": 4, "class" : "t", "size": 1}]
+    </code>
+
+    <h4>Output (stylized)</h4>
+
+    <ul>
+        <li>type: String field with value "A"</li>
+        <li>subtype: Number field with value 4</li>
+        <li>class: String field with value "t" and serving as partition key</li>
+        <li>size: Number field with value 1 and serving as sort key</li>
+    </ul>
+
+    <h3>Using FlowFile filename as partition key with generated sort key</h3>
+
+    <h4>Setup</h4>
+
+    <ul>
+        <li>Partition Key Strategy: Partition By Attribute</li>
+        <li>Partition Key Field: source</li>
+        <li>Partition Key Attribute: filename</li>
+        <li>Sort Key Strategy: Generate Sequence</li>
+        <li>Sort Key Field: sort</li>
+    </ul>
+
+    <h4>Result</h4>
+
+    <p>
+        The FlowFile's filename attribute will be used as partition key. In this case all the records within the same FlowFile will share the same partition key.
+        In order to avoid collusion, if FlowFiles contain multiple records, using sort key is suggested.
+        In this case a generated sequence is used which is guaranteed to be unique within a given FlowFile.
+    </p>
+
+    <h4>Input</h4>
+
+    <code>
+[
+    {"type": "A", "subtype": 4, "class" : "t", "size": 1},
+    {"type": "B", "subtype": 5, "class" : "m", "size": 2}
+]
+    </code>
+
+    <h4>Output (stylized)</h4>
+
+    <h5>First Item</h5>
+
+    <ul>
+        <li>source: String field with value "data46362.json" and serving as partition key</li>
+        <li>type: String field with value "A"</li>
+        <li>subtype: Number field with value 4</li>
+        <li>class: String field with value "t"</li>
+        <li>size: Number field with value 1</li>
+        <li>sort: Number field with value 1</li>

Review comment:
       You can add "and serving as sort key", like in case of the partition key.
   Also for the Second Item below.

##########
File path: nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/serialization/SplitRecordSetHandler.java
##########
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.serialization;
+
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSet;
+
+import java.io.IOException;
+
+public abstract class SplitRecordSetHandler {
+    final int maximumChunkSize;
+
+    protected SplitRecordSetHandler(int maximumChunkSize) {
+        if (maximumChunkSize < 1) {
+            throw new IllegalArgumentException("The maximum chunk size must be a positive number");
+        }
+
+        this.maximumChunkSize = maximumChunkSize;
+    }
+
+    public final RecordHandlerResult handle(final RecordSet recordSet) throws Exception {
+        return handle(recordSet, 0);
+    }
+
+    public final RecordHandlerResult handle(final RecordSet recordSet, final int alreadyProcessedChunks) throws IOException {
+        Record record;
+        int currentChunkNumber = 1;
+        int currentChunkSize = 0;
+
+        while ((record = recordSet.next()) != null) {
+            if (currentChunkSize < maximumChunkSize) {
+                addToChunk(record);
+                currentChunkSize++;
+            } else {
+                try {
+                    handleChunk(alreadyProcessedChunks >= currentChunkNumber);
+                    addToChunk(record);
+                    currentChunkNumber++;
+                    currentChunkSize = 1;
+                } catch (final SplitRecordSetHandlerException e) {
+                    return new RecordHandlerResult(currentChunkNumber - 1, e);
+                }
+            }
+        }
+
+        // Handling the last, not fully filled chunk
+        if (currentChunkSize != 0) {
+            try {
+                handleChunk(alreadyProcessedChunks >= currentChunkNumber);
+            } catch (final SplitRecordSetHandlerException e) {
+                return new RecordHandlerResult(currentChunkNumber - 1, e);
+            }
+        }
+
+        // The incoming RecordSet was empty
+        if (currentChunkSize == 0 && currentChunkNumber == 1) {
+            currentChunkNumber = 0;
+        }
+
+        return new RecordHandlerResult(currentChunkNumber);

Review comment:
       In my opinion, it would be more straightforward this way (copying the whole method body because there are multiple small changes):
   ```
   Record record;
   int currentChunkNumber = 0;
   int currentChunkSize = 0;
   
   while ((record = recordSet.next()) != null) {
       addToChunk(record);
       currentChunkSize++;
   
       if (currentChunkSize == maximumChunkSize) {
           try {
               handleChunk(alreadyProcessedChunks > currentChunkNumber);
               currentChunkNumber++;
               currentChunkSize = 0;
           } catch (final SplitRecordSetHandlerException e) {
               return new RecordHandlerResult(currentChunkNumber, e);
           }
       }
   }
   
   // Handling the last, not fully filled chunk
   if (currentChunkSize != 0) {
       try {
           handleChunk(alreadyProcessedChunks > currentChunkNumber);
           currentChunkNumber++;
       } catch (final SplitRecordSetHandlerException e) {
           return new RecordHandlerResult(currentChunkNumber, e);
       }
   }
   
   return new RecordHandlerResult(currentChunkNumber);
   ```
   Though I do not insist on it at all.
   If you keep the original one, please fix the comment:
   `// Handling the last, not fully filled chunk` => `// Handling the last chunk`
   because it may or may not be fully filled in that implementation.

##########
File path: nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/docs/org.apache.nifi.processors.aws.dynamodb.PutDynamoDBRecord/additionalDetails.html
##########
@@ -0,0 +1,258 @@
+<!DOCTYPE html>
+<html lang="en">
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+      http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<head>
+    <meta charset="utf-8" />
+    <title>PutDynamoDBRecord</title>
+    <link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css" />
+</head>
+<body>
+    <h2>Description</h2>
+
+    <p>
+        <i>PutDynamoDBRecord</i> intends to provide the capability to insert multiple Items into a DynamoDB table from a record-oriented FlowFile.
+        Compared to the <i>PutDynamoDB</i>, this processor is capable to process data based other than JSON format too and prepared to add multiple fields for a given Item.
+        Also, <i>PutDynamoDBRecord</i> is designed to insert bigger batches of data into the database.
+    </p>
+
+    <h2>Data types</h2>
+
+    <p>
+        The list data types supported by DynamoDB does not fully overlaps with the capabilities of the Record data structure.
+        Some conversions and simplifications are necessary during inserting the data. These are:
+    </p>
+
+    <ul>
+        <li>Numeric values are stored using a floating-point data structure within Items. In some cases this representation might cause issues with the accuracy.</li>
+        <li>Char is not a supported type within DynamoDB, these fields are converted into String values.</li>
+        <li>Enum types are stored as String fields, using the name of the given enum.</li>
+        <li>DynamoDB stores time and date related information as Strings.</li>
+        <li>Internal record structures are converted into maps.</li>
+        <li>Choice is not a supported data type, regardless of the actual wrapped data type, values enveloped in Choice are handled as Strings.</li>
+        <li>Unknown data types are handles as stings.</li>
+    </ul>
+
+    <h2>Limitations</h2>
+
+    <p>
+        Working with DynamoDB when batch inserting comes with two inherit limitations. First, the number of inserted Items is limited to 25 in any case.
+        In order to overcome this, during one execution, depending on the number or records in the incoming FlowFile, <i>PutDynamoDBRecord</i> might attempt multiple
+        insert calls towards the database server. Using this approach, the flow does not have to work with this limitation in most cases.
+    </p>
+
+    <p>
+        Having multiple external actions comes with the risk of having an unforeseen result at one of the steps.
+        For example when the incoming FlowFile is consists of 70 records, it will be split into 3 chunks, with a single insert operation for every chunk.
+        The first two chunks contains 25 Items to insert per chunk, and the third contains the remaining 20. In some cases it might occur that the first two insert operation succeeds but the third one fails.
+        In these cases we consider the FlowFile "partially processed" and we will transfer it to the "failure" or "unprocessed" Relationship according to the nature of the issue.
+        In order to keep the information about the successfully processed chunks the processor assigns the <i>"dynamodb.chunks.processed"</i> attribute to the FlowFile, which has the number of successfully processed chunks as value.
+    </p>
+
+    <p>
+        The most common reason for this behaviour comes from the other limitation the inserts have with DynamoDB: the database has a build in supervision over the amount of inserted data.
+        When a client reaches the "throughput limit", the server refuses to process the insert request until a certain amount of time. More information on this might be find <a href="https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/HowItWorks.ReadWriteCapacityMode.html">here</a>.
+        From the perspective of the <i>PutDynamoDBRecord</i> we consider these cases as temporary issues and the FlowFile will be transferred to the "unprocessed" Relationship after which the processor will yield in order to avoid further throughput issues.
+        (Other kinds of failures will result transfer to the "failure" Relationship)
+    </p>
+
+    <h2>Retry</h2>
+
+    <p>
+        It is suggested to loop back the "unprocessed" Relationship to the <i>PutDynamoDBRecord</i> in some way. FlowFiles transferred to that relationship considered as healthy ones might be successfully processed in a later point.
+        It is possible that the FlowFile contains such a high number of records, what needs more than two attempts to fully insert.
+        The attribute "dynamodb.chunks.processed" is "rolled" through the attempts, which means, after each trigger it will contain the sum number of inserted chunks making it possible for the later attempts to continue from the right point without duplicated inserts.
+    </p>
+
+    <h2>Partition and sort keys</h2>
+
+    <p>
+        The processor supports multiple strategies for assigning partition key and sort key to the inserted Items. These are:
+    </p>
+
+    <h3>Partition Key Strategies</h3>
+
+    <h4>Partition By Field</h4>
+
+    <p>
+        The processors assigns one of the record field as partition key. The name of the record field is specified by the "Partition Key Field" attribute and the value will be the value of the record field with the same name.
+    </p>
+
+    <h4>Partition By Attribute</h4>
+
+    <p>
+        The processor assigns the value of a FlowFile attribute as partition key. With this strategy all the Items within a FlowFile will share the same partition case and it is suggested to use a sort key in order to meet the primary key requirements of the DynamoDB.
+        The property "Partition Key Field" defines the name of the Item field and the property "Partition Key Attribute" will specify which attribute's value will be assigned to the partition key.
+        With this strategy the "Partition Key Field" must be different from the fields consisted by the incoming records.
+    </p>
+
+    <h4>Generated UUID</h4>
+
+    <p>
+        By using this strategy the processor will generate a UUID identifier for every single Item. This identifier will be used as value for the partition key.
+        The name of the field used as partition key is defined by the property "Partition Key Field".
+        With this strategy the "Partition Key Field" must be different from the fields consisted by the incoming records.
+    </p>
+
+    <h3>Sort Key Strategies</h3>
+
+    <h4>None</h4>
+
+    <p>
+        No sort key will be assigned to the Item. In case of the table definition expects it, using this strategy will result unsuccessful inserts.
+    </p>
+
+    <h4>Sort By Field</h4>
+
+    <p>
+        The processors assigns one of the record field as sort key. The name of the record field is specified by the "Sort Key Field" attribute and the value will be the value of the record field with the same name.

Review comment:
       Typos:
   
   - `one of the record fields`
   - `"Sort Key Field" property`

##########
File path: nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/dynamodb/RecordToItemConverterTest.java
##########
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.dynamodb;
+
+import com.amazonaws.services.dynamodbv2.document.Item;
+import org.apache.nifi.action.Component;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.junit.Assert;
+import org.junit.jupiter.api.Assertions;

Review comment:
       In new tests only JUnit 5's `Assertions` should be used.
   Please remove `org.junit.Assert` and replace the `Assert.assert*` calls with `Assertions.assert*`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org