You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by th...@apache.org on 2022/11/15 04:06:40 UTC

[nifi] branch main updated: NIFI-10243: allow ControlRate to throttle on combination of data rate or flowfile rate

This is an automated email from the ASF dual-hosted git repository.

thenatog pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 2bfefc3e5b NIFI-10243: allow ControlRate to throttle on combination of data rate or flowfile rate
2bfefc3e5b is described below

commit 2bfefc3e5bd126d41181011f9a86701cf3f2c828
Author: Mark Bean <ma...@gmail.com>
AuthorDate: Mon Oct 10 15:00:25 2022 -0400

    NIFI-10243: allow ControlRate to throttle on combination of data rate or flowfile rate
    
    NIFI-10243: fix typos
    
    NIFI-10243: re-ordered property in ControlRate
    
    NIFI-10243: minor updates to make code cleaner based on PR comments
    
    Signed-off-by: Nathan Gough <th...@gmail.com>
    
    This closes #6506.
---
 .../nifi/processors/standard/ControlRate.java      | 306 ++++++++++++++-------
 .../additionalDetails.html                         |  64 +++++
 .../nifi/processors/standard/TestControlRate.java  | 271 +++++++++++++++++-
 3 files changed, 540 insertions(+), 101 deletions(-)

diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ControlRate.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ControlRate.java
index 4634f041e1..34b9a8144b 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ControlRate.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ControlRate.java
@@ -16,21 +16,6 @@
  */
 package org.apache.nifi.processors.standard;
 
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.ReentrantLock;
-import java.util.regex.Pattern;
-
 import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
 import org.apache.nifi.annotation.behavior.SideEffectFree;
@@ -42,7 +27,6 @@ import org.apache.nifi.components.AllowableValue;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.ValidationContext;
 import org.apache.nifi.components.ValidationResult;
-import org.apache.nifi.components.Validator;
 import org.apache.nifi.expression.ExpressionLanguageScope;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.logging.ComponentLog;
@@ -59,6 +43,21 @@ import org.apache.nifi.util.timebuffer.LongEntityAccess;
 import org.apache.nifi.util.timebuffer.TimedBuffer;
 import org.apache.nifi.util.timebuffer.TimestampedLong;
 
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.regex.Pattern;
+
 @SideEffectFree
 @TriggerSerially
 @InputRequirement(Requirement.INPUT_REQUIRED)
@@ -71,30 +70,58 @@ public class ControlRate extends AbstractProcessor {
     public static final String DATA_RATE = "data rate";
     public static final String FLOWFILE_RATE = "flowfile count";
     public static final String ATTRIBUTE_RATE = "attribute value";
+    public static final String DATA_OR_FLOWFILE_RATE = "data rate or flowfile count";
+
     public static final AllowableValue DATA_RATE_VALUE = new AllowableValue(DATA_RATE, DATA_RATE,
             "Rate is controlled by counting bytes transferred per time duration.");
     public static final AllowableValue FLOWFILE_RATE_VALUE = new AllowableValue(FLOWFILE_RATE, FLOWFILE_RATE,
-            "Rate is controlled by counting flowfiles transferred per time duration");
+            "Rate is controlled by counting FlowFiles transferred per time duration");
     public static final AllowableValue ATTRIBUTE_RATE_VALUE = new AllowableValue(ATTRIBUTE_RATE, ATTRIBUTE_RATE,
             "Rate is controlled by accumulating the value of a specified attribute that is transferred per time duration");
+    public static final AllowableValue DATA_OR_FLOWFILE_RATE_VALUE = new AllowableValue(DATA_OR_FLOWFILE_RATE, DATA_OR_FLOWFILE_RATE,
+            "Rate is controlled by counting bytes and FlowFiles transferred per time duration; if either threshold is met, throttling is enforced");
 
     // based on testing to balance commits and 10,000 FF swap limit
     public static final int MAX_FLOW_FILES_PER_BATCH = 1000;
+    private static final long DEFAULT_ACCRUAL_COUNT = -1L;
 
     public static final PropertyDescriptor RATE_CONTROL_CRITERIA = new PropertyDescriptor.Builder()
             .name("Rate Control Criteria")
+            .displayName("Rate Control Criteria")
             .description("Indicates the criteria that is used to control the throughput rate. Changing this value resets the rate counters.")
             .required(true)
-            .allowableValues(DATA_RATE_VALUE, FLOWFILE_RATE_VALUE, ATTRIBUTE_RATE_VALUE)
+            .allowableValues(DATA_RATE_VALUE, FLOWFILE_RATE_VALUE, ATTRIBUTE_RATE_VALUE, DATA_OR_FLOWFILE_RATE_VALUE)
             .defaultValue(DATA_RATE)
             .build();
     public static final PropertyDescriptor MAX_RATE = new PropertyDescriptor.Builder()
             .name("Maximum Rate")
+            .displayName("Maximum Rate")
             .description("The maximum rate at which data should pass through this processor. The format of this property is expected to be a "
                     + "positive integer, or a Data Size (such as '1 MB') if Rate Control Criteria is set to 'data rate'.")
-            .required(true)
+            .required(false)
             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) // validated in customValidate b/c dependent on Rate Control Criteria
+            .dependsOn(RATE_CONTROL_CRITERIA, DATA_RATE_VALUE, FLOWFILE_RATE_VALUE, ATTRIBUTE_RATE_VALUE)
+            .build();
+    public static final PropertyDescriptor MAX_DATA_RATE = new PropertyDescriptor.Builder()
+            .name("Maximum Data Rate")
+            .displayName("Maximum Data Rate")
+            .description("The maximum rate at which data should pass through this processor. The format of this property is expected to be a "
+                    + "Data Size (such as '1 MB') representing bytes per Time Duration.")
+            .required(false)
+            .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+            .dependsOn(RATE_CONTROL_CRITERIA, DATA_OR_FLOWFILE_RATE)
+            .build();
+
+    public static final PropertyDescriptor MAX_COUNT_RATE = new PropertyDescriptor.Builder()
+            .name("Maximum FlowFile Rate")
+            .displayName("Maximum FlowFile Rate")
+            .description("The maximum rate at which FlowFiles should pass through this processor. The format of this property is expected to be a "
+                    + "positive integer representing FlowFiles count per Time Duration")
+            .required(false)
+            .addValidator(StandardValidators.POSITIVE_LONG_VALIDATOR)
+            .dependsOn(RATE_CONTROL_CRITERIA, DATA_OR_FLOWFILE_RATE)
             .build();
+
     public static final PropertyDescriptor RATE_CONTROL_ATTRIBUTE_NAME = new PropertyDescriptor.Builder()
             .name("Rate Controlled Attribute")
             .description("The name of an attribute whose values build toward the rate limit if Rate Control Criteria is set to 'attribute value'. "
@@ -103,6 +130,7 @@ public class ControlRate extends AbstractProcessor {
             .required(false)
             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
             .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .dependsOn(RATE_CONTROL_CRITERIA, ATTRIBUTE_RATE)
             .build();
     public static final PropertyDescriptor TIME_PERIOD = new PropertyDescriptor.Builder()
             .name("Time Duration")
@@ -135,11 +163,13 @@ public class ControlRate extends AbstractProcessor {
     private List<PropertyDescriptor> properties;
     private Set<Relationship> relationships;
 
-    private final ConcurrentMap<String, Throttle> throttleMap = new ConcurrentHashMap<>();
+    private final ConcurrentMap<String, Throttle> dataThrottleMap = new ConcurrentHashMap<>();
+    private final ConcurrentMap<String, Throttle> countThrottleMap = new ConcurrentHashMap<>();
     private final AtomicLong lastThrottleClearTime = new AtomicLong(System.currentTimeMillis());
     private volatile String rateControlCriteria = null;
     private volatile String rateControlAttribute = null;
     private volatile String maximumRateStr = null;
+    private volatile String maximumCountRateStr = null;
     private volatile String groupingAttributeName = null;
     private volatile int timePeriodSeconds = 1;
 
@@ -147,9 +177,11 @@ public class ControlRate extends AbstractProcessor {
     protected void init(final ProcessorInitializationContext context) {
         final List<PropertyDescriptor> properties = new ArrayList<>();
         properties.add(RATE_CONTROL_CRITERIA);
+        properties.add(TIME_PERIOD);
         properties.add(MAX_RATE);
+        properties.add(MAX_DATA_RATE);
+        properties.add(MAX_COUNT_RATE);
         properties.add(RATE_CONTROL_ATTRIBUTE_NAME);
-        properties.add(TIME_PERIOD);
         properties.add(GROUPING_ATTRIBUTE_NAME);
         this.properties = Collections.unmodifiableList(properties);
 
@@ -173,32 +205,32 @@ public class ControlRate extends AbstractProcessor {
     protected Collection<ValidationResult> customValidate(final ValidationContext context) {
         final List<ValidationResult> validationResults = new ArrayList<>(super.customValidate(context));
 
-        final Validator rateValidator;
         switch (context.getProperty(RATE_CONTROL_CRITERIA).getValue().toLowerCase()) {
+            case DATA_OR_FLOWFILE_RATE:
+                // enforce validators to be sure properties are configured; they are only required for DATA_OR_FLOWFILE_RATE criteria
+                validationResults.add(StandardValidators.DATA_SIZE_VALIDATOR.validate(MAX_DATA_RATE.getDisplayName(), context.getProperty(MAX_DATA_RATE).getValue(), context));
+                validationResults.add(StandardValidators.POSITIVE_LONG_VALIDATOR.validate(MAX_COUNT_RATE.getDisplayName(), context.getProperty(MAX_COUNT_RATE).getValue(), context));
+                break;
             case DATA_RATE:
-                rateValidator = StandardValidators.DATA_SIZE_VALIDATOR;
+                validationResults.add(StandardValidators.DATA_SIZE_VALIDATOR.validate("Maximum Rate", context.getProperty(MAX_RATE).getValue(), context));
                 break;
+
             case ATTRIBUTE_RATE:
-                rateValidator = StandardValidators.POSITIVE_LONG_VALIDATOR;
                 final String rateAttr = context.getProperty(RATE_CONTROL_ATTRIBUTE_NAME).getValue();
                 if (rateAttr == null) {
                     validationResults.add(new ValidationResult.Builder()
                             .subject(RATE_CONTROL_ATTRIBUTE_NAME.getName())
-                            .explanation("<Rate Controlled Attribute> property must be set if using <Rate Control Criteria> of 'attribute value'")
+                            .explanation("property must be set if using <Rate Control Criteria> of 'attribute value'")
                             .build());
                 }
-                break;
             case FLOWFILE_RATE:
+                validationResults.add(StandardValidators.POSITIVE_LONG_VALIDATOR.validate("Maximum Rate", context.getProperty(MAX_RATE).getValue(), context));
+                break;
             default:
-                rateValidator = StandardValidators.POSITIVE_LONG_VALIDATOR;
+                // no custom validation required
                 break;
         }
 
-        final ValidationResult rateResult = rateValidator.validate("Maximum Rate", context.getProperty(MAX_RATE).getValue(), context);
-        if (!rateResult.isValid()) {
-            validationResults.add(rateResult);
-        }
-
         return validationResults;
     }
 
@@ -211,16 +243,37 @@ public class ControlRate extends AbstractProcessor {
                 || descriptor.equals(GROUPING_ATTRIBUTE_NAME)
                 || descriptor.equals(TIME_PERIOD)) {
             // if the criteria that is being used to determine limits/throttles is changed, we must clear our throttle map.
-            throttleMap.clear();
-        } else if (descriptor.equals(MAX_RATE)) {
+            dataThrottleMap.clear();
+            countThrottleMap.clear();
+        } else if (descriptor.equals(MAX_RATE) || descriptor.equals(MAX_DATA_RATE)) {
+            // MAX_RATE could affect either throttle map; MAX_DATA_RATE only affects data throttle map
             final long newRate;
-            if (DataUnit.DATA_SIZE_PATTERN.matcher(newValue.toUpperCase()).matches()) {
-                newRate = DataUnit.parseDataSize(newValue, DataUnit.B).longValue();
-            } else {
+            if (newValue != null) {
+                if (DataUnit.DATA_SIZE_PATTERN.matcher(newValue.toUpperCase()).matches()) {
+                    newRate = DataUnit.parseDataSize(newValue, DataUnit.B).longValue();
+                } else {
+                    newRate = Long.parseLong(newValue);
+                }
+                if (dataThrottleRequired()) {
+                    for (final Throttle throttle : dataThrottleMap.values()) {
+                        throttle.setMaxRate(newRate);
+                    }
+                }
+                if (countThrottleRequired()) {
+                    for (final Throttle throttle : countThrottleMap.values()) {
+                        throttle.setMaxRate(newRate);
+                    }
+                }
+            }
+        } else if (descriptor.equals(MAX_COUNT_RATE)) {
+            // MAX_COUNT_RATE only affects count throttle map
+            long newRate;
+            try {
                 newRate = Long.parseLong(newValue);
+            } catch (NumberFormatException nfe) {
+                newRate = -1;
             }
-
-            for (final Throttle throttle : throttleMap.values()) {
+            for (final Throttle throttle : countThrottleMap.values()) {
                 throttle.setMaxRate(newRate);
             }
         }
@@ -230,7 +283,16 @@ public class ControlRate extends AbstractProcessor {
     public void onScheduled(final ProcessContext context) {
         rateControlCriteria = context.getProperty(RATE_CONTROL_CRITERIA).getValue().toLowerCase();
         rateControlAttribute = context.getProperty(RATE_CONTROL_ATTRIBUTE_NAME).getValue();
-        maximumRateStr = context.getProperty(MAX_RATE).getValue().toUpperCase();
+        if (dataThrottleRequired()) {
+            // Use MAX_DATA_RATE only for DATA_OR_FLOWFILE_RATE criteria
+            maximumRateStr = rateControlCriteria.equals(DATA_OR_FLOWFILE_RATE)
+                    ? context.getProperty(MAX_DATA_RATE).getValue().toUpperCase() : context.getProperty(MAX_RATE).getValue().toUpperCase();
+        }
+        if (countThrottleRequired()) {
+            // Use MAX_COUNT_RATE only for DATA_OR_FLOWFILE_RATE criteria
+            maximumCountRateStr = rateControlCriteria.equals(DATA_OR_FLOWFILE_RATE)
+                    ? context.getProperty(MAX_COUNT_RATE).getValue() : context.getProperty(MAX_RATE).getValue();
+        }
         groupingAttributeName = context.getProperty(GROUPING_ATTRIBUTE_NAME).getValue();
         timePeriodSeconds = context.getProperty(TIME_PERIOD).asTimePeriod(TimeUnit.SECONDS).intValue();
     }
@@ -248,7 +310,14 @@ public class ControlRate extends AbstractProcessor {
         final long throttleExpirationMillis = System.currentTimeMillis() - 2 * context.getProperty(TIME_PERIOD).asTimePeriod(TimeUnit.MILLISECONDS);
         if (lastClearTime < throttleExpirationMillis) {
             if (lastThrottleClearTime.compareAndSet(lastClearTime, System.currentTimeMillis())) {
-                final Iterator<Map.Entry<String, Throttle>> itr = throttleMap.entrySet().iterator();
+                final Set<Map.Entry<String, Throttle>> throttleSet = new HashSet<>();
+                if (dataThrottleRequired()) {
+                    throttleSet.addAll(dataThrottleMap.entrySet());
+                }
+                if (countThrottleRequired()) {
+                    throttleSet.addAll(countThrottleMap.entrySet());
+                }
+                final Iterator<Map.Entry<String, Throttle>> itr = throttleSet.iterator();
                 while (itr.hasNext()) {
                     final Map.Entry<String, Throttle> entry = itr.next();
                     final Throttle throttle = entry.getValue();
@@ -268,48 +337,67 @@ public class ControlRate extends AbstractProcessor {
         final ComponentLog logger = getLogger();
         for (FlowFile flowFile : flowFiles) {
             // call this to capture potential error
-            final long accrualAmount = getFlowFileAccrual(flowFile);
-            if (accrualAmount < 0) {
-                logger.error("Routing {} to 'failure' due to missing or invalid attribute", new Object[]{flowFile});
+            if (!isAccrualPossible(flowFile)) {
+                logger.error("Routing {} to 'failure' due to missing or invalid attribute", flowFile);
                 session.transfer(flowFile, REL_FAILURE);
             } else {
-                logger.info("transferring {} to 'success'", new Object[]{flowFile});
+                logger.info("transferring {} to 'success'", flowFile);
                 session.transfer(flowFile, REL_SUCCESS);
             }
         }
     }
 
+    /*
+     * Determine if the accrual amount is valid for the type of throttle being applied. For example, if throttling based on
+     * flowfile attribute, the specified attribute must be present and must be a long integer.
+     */
+    private boolean isAccrualPossible(FlowFile flowFile) {
+        if (rateControlCriteria.equals(ATTRIBUTE_RATE)) {
+            final String attributeValue = flowFile.getAttribute(rateControlAttribute);
+            return attributeValue != null && POSITIVE_LONG_PATTERN.matcher(attributeValue).matches();
+        }
+        return true;
+    }
+
     /*
      * Determine the amount this FlowFile will incur against the maximum allowed rate.
-     * If the value returned is negative then the flowfile given is missing the required attribute
-     * or the attribute has an invalid value for accrual.
+     * This is applicable to data size accrual only
      */
-    private long getFlowFileAccrual(FlowFile flowFile) {
-        long rateValue;
-        switch (rateControlCriteria) {
-            case DATA_RATE:
-                rateValue = flowFile.getSize();
-                break;
-            case FLOWFILE_RATE:
-                rateValue = 1;
-                break;
-            case ATTRIBUTE_RATE:
-                final String attributeValue = flowFile.getAttribute(rateControlAttribute);
-                if (attributeValue == null) {
-                    return -1L;
-                }
+    private long getDataSizeAccrual(FlowFile flowFile) {
+        return flowFile.getSize();
+    }
 
-                if (!POSITIVE_LONG_PATTERN.matcher(attributeValue).matches()) {
-                    return -1L;
-                }
-                rateValue = Long.parseLong(attributeValue);
-                break;
-            default:
-                throw new AssertionError("<Rate Control Criteria> property set to illegal value of " + rateControlCriteria);
+    /*
+     * Determine the amount this FlowFile will incur against the maximum allowed rate.
+     * This is applicable to counting accruals, flowfiles or attributes
+     */
+    private long getCountAccrual(FlowFile flowFile) {
+        long rateValue = DEFAULT_ACCRUAL_COUNT;
+        if (rateControlCriteria.equals(FLOWFILE_RATE) || rateControlCriteria.equals(DATA_OR_FLOWFILE_RATE)) {
+            rateValue = 1;
+        }
+        if (rateControlCriteria.equals(ATTRIBUTE_RATE)) {
+            final String attributeValue = flowFile.getAttribute(rateControlAttribute);
+            if (attributeValue == null) {
+                return DEFAULT_ACCRUAL_COUNT;
+            }
+
+            if (!POSITIVE_LONG_PATTERN.matcher(attributeValue).matches()) {
+                return DEFAULT_ACCRUAL_COUNT;
+            }
+            rateValue = Long.parseLong(attributeValue);
         }
         return rateValue;
     }
 
+    private boolean dataThrottleRequired() {
+        return rateControlCriteria != null && (rateControlCriteria.equals(DATA_RATE) || rateControlCriteria.equals(DATA_OR_FLOWFILE_RATE));
+    }
+
+    private boolean countThrottleRequired() {
+        return rateControlCriteria != null && (rateControlCriteria.equals(FLOWFILE_RATE) || rateControlCriteria.equals(ATTRIBUTE_RATE) || rateControlCriteria.equals(DATA_OR_FLOWFILE_RATE));
+    }
+
     private static class Throttle extends ReentrantLock {
 
         private final AtomicLong maxRate = new AtomicLong(1L);
@@ -336,6 +424,10 @@ public class ControlRate extends AbstractProcessor {
         }
 
         public boolean tryAdd(final long value) {
+            // value should never be negative, but if it is return immediately
+            if (value < 0) {
+                return false;
+            }
             final long now = System.currentTimeMillis();
             if (penalizationExpired > now) {
                 return false;
@@ -346,7 +438,7 @@ public class ControlRate extends AbstractProcessor {
             final TimestampedLong sum = timedBuffer.getAggregateValue(timePeriodMillis);
             if (sum != null && sum.getValue() >= maxRateValue) {
                 if (logger.isDebugEnabled()) {
-                    logger.debug("current sum for throttle is {} at time {}, so not allowing rate of {} through", new Object[]{sum.getValue(), sum.getTimestamp(), value});
+                    logger.debug("current sum for throttle is {} at time {}, so not allowing rate of {} through", sum.getValue(), sum.getTimestamp(), value);
                 }
                 return false;
             }
@@ -354,7 +446,7 @@ public class ControlRate extends AbstractProcessor {
             // Implement the Throttle penalization based on how much extra 'amountOver' was allowed through
             if (penalizationPeriod > 0) {
                 if (logger.isDebugEnabled()) {
-                    logger.debug("Starting Throttle penalization, expiring {} milliseconds from now", new Object[]{penalizationPeriod});
+                    logger.debug("Starting Throttle penalization, expiring {} milliseconds from now", penalizationPeriod);
                 }
                 penalizationExpired = now + penalizationPeriod;
                 penalizationPeriod = 0;
@@ -363,7 +455,7 @@ public class ControlRate extends AbstractProcessor {
 
             if (logger.isDebugEnabled()) {
                 logger.debug("current sum for throttle is {} at time {}, so allowing rate of {} through",
-                        new Object[]{sum == null ? 0 : sum.getValue(), sum == null ? 0 : sum.getTimestamp(), value});
+                        sum == null ? 0 : sum.getValue(), sum == null ? 0 : sum.getTimestamp(), value);
             }
 
             final long transferred = timedBuffer.add(new TimestampedLong(value)).getValue();
@@ -374,7 +466,7 @@ public class ControlRate extends AbstractProcessor {
                 this.penalizationPeriod = (long) (timePeriodMillis * pct);
 
                 if (logger.isDebugEnabled()) {
-                    logger.debug("allowing rate of {} through but penalizing Throttle for {} milliseconds", new Object[]{value, penalizationPeriod});
+                    logger.debug("allowing rate of {} through but penalizing Throttle for {} milliseconds", value, penalizationPeriod);
                 }
             }
 
@@ -394,8 +486,7 @@ public class ControlRate extends AbstractProcessor {
 
         @Override
         public FlowFileFilterResult filter(FlowFile flowFile) {
-            long accrual = getFlowFileAccrual(flowFile);
-            if (accrual < 0) {
+            if (!isAccrualPossible(flowFile)) {
                 // this FlowFile is invalid for this configuration so let the processor deal with it
                 return FlowFileFilterResult.ACCEPT_AND_TERMINATE;
             }
@@ -408,34 +499,59 @@ public class ControlRate extends AbstractProcessor {
                 groupName = DEFAULT_GROUP_ATTRIBUTE;
             }
 
-            Throttle throttle = throttleMap.get(groupName);
-            if (throttle == null) {
-                throttle = new Throttle(timePeriodSeconds, TimeUnit.SECONDS, getLogger());
+            Throttle dataThrottle = dataThrottleMap.get(groupName);
+            Throttle countThrottle = countThrottleMap.get(groupName);
 
-                final long newRate;
-                if (DataUnit.DATA_SIZE_PATTERN.matcher(maximumRateStr).matches()) {
-                    newRate = DataUnit.parseDataSize(maximumRateStr, DataUnit.B).longValue();
-                } else {
-                    newRate = Long.parseLong(maximumRateStr);
+            boolean dataThrottlingActive = false;
+            if (dataThrottleRequired()) {
+                if (dataThrottle == null) {
+                    dataThrottle = new Throttle(timePeriodSeconds, TimeUnit.SECONDS, getLogger());
+                    dataThrottle.setMaxRate(DataUnit.parseDataSize(maximumRateStr, DataUnit.B).longValue());
+                    dataThrottleMap.put(groupName, dataThrottle);
                 }
-                throttle.setMaxRate(newRate);
 
-                throttleMap.put(groupName, throttle);
+                dataThrottle.lock();
+                try {
+                    if (dataThrottle.tryAdd(getDataSizeAccrual(flowFile))) {
+                        flowFilesInBatch++;
+                        if (flowFilesInBatch>= flowFilesPerBatch) {
+                            flowFilesInBatch = 0;
+                            return FlowFileFilterResult.ACCEPT_AND_TERMINATE;
+                        } else {
+                            // only accept flowfile if additional count throttle does not need to run
+                            if (!countThrottleRequired()) {
+                                return FlowFileFilterResult.ACCEPT_AND_CONTINUE;
+                            }
+                        }
+                    } else {
+                        dataThrottlingActive = true;
+                    }
+                } finally {
+                    dataThrottle.unlock();
+                }
             }
 
-            throttle.lock();
-            try {
-                if (throttle.tryAdd(accrual)) {
-                    flowFilesInBatch += 1;
-                    if (flowFilesInBatch>= flowFilesPerBatch) {
-                        flowFilesInBatch = 0;
-                        return FlowFileFilterResult.ACCEPT_AND_TERMINATE;
-                    } else {
-                        return FlowFileFilterResult.ACCEPT_AND_CONTINUE;
+            // continue processing count throttle only if required and if data throttle is not already limiting flowfiles
+            if (countThrottleRequired() && !dataThrottlingActive) {
+                if (countThrottle == null) {
+                    countThrottle = new Throttle(timePeriodSeconds, TimeUnit.SECONDS, getLogger());
+                    countThrottle.setMaxRate(Long.parseLong(maximumCountRateStr));
+                    countThrottleMap.put(groupName, countThrottle);
+                }
+                countThrottle.lock();
+                try {
+                    if (countThrottle.tryAdd(getCountAccrual(flowFile))) {
+                        flowFilesInBatch++;
+                        if (flowFilesInBatch>= flowFilesPerBatch) {
+                            flowFilesInBatch = 0;
+                            return FlowFileFilterResult.ACCEPT_AND_TERMINATE;
+                        } else {
+                            return FlowFileFilterResult.ACCEPT_AND_CONTINUE;
+                        }
                     }
+                } finally {
+                    countThrottle.unlock();
                 }
-            } finally {
-                throttle.unlock();
             }
 
             // If we are not using a grouping attribute, then no FlowFile will be able to continue on. So we can
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.ControlRate/additionalDetails.html b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.ControlRate/additionalDetails.html
new file mode 100644
index 0000000000..7c8de2c7f5
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.ControlRate/additionalDetails.html
@@ -0,0 +1,64 @@
+<!DOCTYPE html>
+<html lang="en">
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+      http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<head>
+    <meta charset="utf-8" />
+    <title>ControlRate</title>
+
+    <link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css" />
+</head>
+<body>
+    <p>This processor throttles throughput of FlowFiles based on a configured rate. The rate can be specified as either a direct data rate (bytes per time period), or by
+        counting FlowFiles or a specific attribute value. In all cases, the time period for measurement is specified in the Time Duration property.
+    </p>
+    <p>The processor operates in one of four available modes. The mode is determined by the Rate Control Criteria property.
+    </p>
+    <p>
+        <table>
+            <tr>
+                <th>Mode</th>
+                <th>Description</th>
+            </tr>
+            <tr>
+                <td>Data Rate</td>
+                <td>The FlowFile content size is accumulated for all FlowFiles passing through this processor. FlowFiles are throttled to ensure a maximum overall data rate (bytes per time period)
+                    is not exceeded. The Maximum Rate property specifies the maximum bytes allowed per Time Duration.</td>
+            </tr>
+            <tr>
+                <td>FlowFile Count</td>
+                <td>FlowFiles are counted regardless of content size. No more than the specified number of FlowFiles pass through this processor in the given Time Duration. The Maximum Rate property
+                    specifies the maximum number of FlowFiles allowed per Time Duration.</td>
+            </tr>
+            <tr>
+                <td>Attribute Value</td>
+                <td>The value of an attribute is accumulated to determine overall rate. The Rate Controlled Attribute property specifies the attribute whose value will be accumulated. The value of
+                    the specified attribute is expected to be an integer. This mode is independent of overall FlowFile size and count.</td>
+            </tr>
+            <tr>
+                <td>Data Rate or FlowFile Count</td>
+                <td>This mode provides a combination of Data Rate and FlowFile Count. Both rates are accumulated and FlowFiles are throttled if either rate is exceeded. Both Maximum Data Rate and
+                    Maximum FlowFile Rate properties must be specified to determine content size and FlowFile count per Time Duration.</td>
+            </tr>
+        </table>
+    </p>
+    <p>If the Grouping Attribute property is specified, all rates are accumulated separately for unique values of the specified attribute. For example, assume Grouping Attribute property is
+        specified and the its value is "city". All FlowFiles containing a "city" attribute with value "Albuquerque" will have an accumulated rate calculated. A separate rate will be calculated
+        for all FlowFiles containing a "city" attribute with a value "Boston". In other words, separate rate calculations will be accumulated for all unique values of the Grouping Attribute.
+    </p>
+
+
+</body>
+</html>
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestControlRate.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestControlRate.java
index fbecd767c4..0b68022622 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestControlRate.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestControlRate.java
@@ -31,6 +31,8 @@ import org.junit.jupiter.api.Test;
 
 public class TestControlRate {
 
+    private static final long ONE_SEC_PLUS = 1010L;
+
     @Test
     public void testLimitExceededThenOtherLimitNotExceeded() {
         // If we have flowfiles queued that have different values for the "Rate Controlled Attribute"
@@ -84,7 +86,7 @@ public class TestControlRate {
         runner.assertQueueNotEmpty();
 
         // we have sent 3 files and after 1 second, we should be able to send the 4th
-        Thread.sleep(1100L);
+        Thread.sleep(ONE_SEC_PLUS);
         runner.run();
         runner.assertAllFlowFilesTransferred(ControlRate.REL_SUCCESS, 1);
         runner.assertQueueEmpty();
@@ -116,7 +118,7 @@ public class TestControlRate {
         runner.assertQueueNotEmpty();
 
         // we have sent 2 files per group and after 1 second, we should be able to send the remaining 1 file per group
-        Thread.sleep(1100L);
+        Thread.sleep(ONE_SEC_PLUS);
         runner.run(2);
         runner.assertAllFlowFilesTransferred(ControlRate.REL_SUCCESS, 2);
         runner.assertQueueEmpty();
@@ -145,7 +147,7 @@ public class TestControlRate {
         runner.assertQueueNotEmpty();
 
         // we have sent 20 bytes and after 1 second, we should be able to send 20 more
-        Thread.sleep(1100L);
+        Thread.sleep(ONE_SEC_PLUS);
         runner.run(2, false);
         runner.assertAllFlowFilesTransferred(ControlRate.REL_SUCCESS, 2);
         runner.assertQueueEmpty();
@@ -192,6 +194,28 @@ public class TestControlRate {
         runner.assertQueueEmpty();
     }
 
+    @Test
+    public void testAttributeDoesNotExist() throws InterruptedException {
+        final TestRunner runner = TestRunners.newTestRunner(new ControlRate());
+        runner.setProperty(ControlRate.RATE_CONTROL_CRITERIA, ControlRate.ATTRIBUTE_RATE);
+        runner.setProperty(ControlRate.RATE_CONTROL_ATTRIBUTE_NAME, "no.such.attribute");
+        runner.setProperty(ControlRate.MAX_RATE, "20000");
+        runner.setProperty(ControlRate.TIME_PERIOD, "1 sec");
+
+        createFlowFile(runner, 1000);
+        createFlowFile(runner, 3000);
+        createFlowFile(runner, 5000);
+        createFlowFile(runner, 20000);
+        createFlowFile(runner, 1000);
+
+        runner.run(5, false);
+
+        // all flowfiles transfer to failure since throttling attribute is not present
+        runner.assertAllFlowFilesTransferred(ControlRate.REL_FAILURE, 5);
+        runner.assertTransferCount(ControlRate.REL_SUCCESS, 0);
+        runner.assertQueueEmpty();
+    }
+
     @Test
     public void testBadAttributeRate() {
         final TestRunner runner = TestRunners.newTestRunner(new ControlRate());
@@ -211,7 +235,7 @@ public class TestControlRate {
     }
 
     @Test
-    public void testBatchLimit() throws InterruptedException {
+    public void testBatchLimit() {
         final TestRunner runner = TestRunners.newTestRunner(new ControlRate());
         runner.setProperty(ControlRate.RATE_CONTROL_CRITERIA, ControlRate.FLOWFILE_RATE);
         runner.setProperty(ControlRate.MAX_RATE, "5555");
@@ -240,7 +264,7 @@ public class TestControlRate {
     }
 
     @Test
-    public void testNonExistingGroupAttribute() throws InterruptedException {
+    public void testNonExistingGroupAttribute() {
         final TestRunner runner = TestRunners.newTestRunner(new ControlRate());
         runner.setProperty(ControlRate.RATE_CONTROL_CRITERIA, ControlRate.FLOWFILE_RATE);
         runner.setProperty(ControlRate.MAX_RATE, "2");
@@ -258,10 +282,245 @@ public class TestControlRate {
         runner.assertQueueEmpty();
     }
 
+    @Test
+    public void testIncreaseDataRate() throws InterruptedException {
+        final TestRunner runner = TestRunners.newTestRunner(new ControlRate());
+        runner.setProperty(ControlRate.RATE_CONTROL_CRITERIA, ControlRate.DATA_RATE);
+        runner.setProperty(ControlRate.MAX_RATE, "11 B");
+        runner.setProperty(ControlRate.TIME_PERIOD, "1 sec");
+
+        runner.enqueue("test data 1");
+        runner.enqueue("test data 2");
+        runner.enqueue("test data 3");
+        runner.enqueue("test data 4");
+        runner.enqueue("test data 5");
+        runner.enqueue("test data 6");
+
+        runner.run(7, true);
+
+        runner.assertTransferCount(ControlRate.REL_SUCCESS, 1);
+        runner.assertTransferCount(ControlRate.REL_FAILURE, 0);
+        runner.assertQueueNotEmpty();
+
+        // Increase rate after stopping processor. Previous count should remain since we are still inside time period
+        runner.setProperty(ControlRate.MAX_RATE, "33 B");
+        runner.run(7, false);
+        runner.assertTransferCount(ControlRate.REL_SUCCESS, 3);
+        runner.assertTransferCount(ControlRate.REL_FAILURE, 0);
+        runner.assertQueueNotEmpty();
+
+        // after 1 second, we should be able to send the up to 3 more flowfiles
+        Thread.sleep(ONE_SEC_PLUS);
+        runner.run(7, false);
+        runner.assertTransferCount(ControlRate.REL_SUCCESS, 6);
+        runner.assertTransferCount(ControlRate.REL_FAILURE, 0);
+        runner.assertQueueEmpty();
+    }
+
+    @Test
+    public void testIncreaseFlowFileRate() throws InterruptedException {
+        final TestRunner runner = TestRunners.newTestRunner(new ControlRate());
+        runner.setProperty(ControlRate.RATE_CONTROL_CRITERIA, ControlRate.FLOWFILE_RATE);
+        runner.setProperty(ControlRate.MAX_RATE, "1");
+        runner.setProperty(ControlRate.TIME_PERIOD, "1 sec");
+
+        runner.enqueue("test data 1");
+        runner.enqueue("test data 2");
+        runner.enqueue("test data 3");
+        runner.enqueue("test data 4");
+        runner.enqueue("test data 5");
+        runner.enqueue("test data 6");
+
+        runner.run(7, true);
+
+        runner.assertTransferCount(ControlRate.REL_SUCCESS, 1);
+        runner.assertTransferCount(ControlRate.REL_FAILURE, 0);
+        runner.assertQueueNotEmpty();
+
+        // Increase rate after stopping processor. Previous count should remain since we are still inside time period
+        runner.setProperty(ControlRate.MAX_RATE, "3");
+        runner.run(7, false);
+        runner.assertTransferCount(ControlRate.REL_SUCCESS, 3);
+        runner.assertTransferCount(ControlRate.REL_FAILURE, 0);
+        runner.assertQueueNotEmpty();
+
+        // after 1 second, we should be able to send the up to 3 more flowfiles
+        Thread.sleep(ONE_SEC_PLUS);
+        runner.run(7, false);
+        runner.assertTransferCount(ControlRate.REL_SUCCESS, 6);
+        runner.assertTransferCount(ControlRate.REL_FAILURE, 0);
+        runner.assertQueueEmpty();
+    }
+
+    @Test
+    public void testDataOrFlowFileCountLimitedByBytes() throws InterruptedException {
+        final TestRunner runner = TestRunners.newTestRunner(new ControlRate());
+        runner.setProperty(ControlRate.RATE_CONTROL_CRITERIA, ControlRate.DATA_OR_FLOWFILE_RATE);
+        runner.setProperty(ControlRate.TIME_PERIOD, "1 sec");
+        // Data rate will throttle before FlowFile count
+        runner.setProperty(ControlRate.MAX_DATA_RATE, "22 B");
+        runner.setProperty(ControlRate.MAX_COUNT_RATE, "3");
+
+        runner.enqueue("test data 1");
+        runner.enqueue("test data 2");
+        runner.enqueue("test data 3");
+
+        runner.run(4, false);
+
+        runner.assertAllFlowFilesTransferred(ControlRate.REL_SUCCESS, 2);
+        runner.assertTransferCount(ControlRate.REL_FAILURE, 0);
+        runner.assertQueueNotEmpty();
+        runner.clearTransferState();
+
+        runner.run(4, false);
+        runner.assertTransferCount(ControlRate.REL_SUCCESS, 0);
+        runner.assertTransferCount(ControlRate.REL_FAILURE, 0);
+        runner.assertQueueNotEmpty();
+        // we have sent 22 bytes and after 1 second, we should be able to send 22 more
+        Thread.sleep(ONE_SEC_PLUS);
+        runner.run(4, false);
+        runner.assertAllFlowFilesTransferred(ControlRate.REL_SUCCESS, 1);
+        runner.assertQueueEmpty();
+    }
+
+    @Test
+    public void testDataOrFlowFileCountLimitedByCount() throws InterruptedException {
+        final TestRunner runner = TestRunners.newTestRunner(new ControlRate());
+        runner.setProperty(ControlRate.RATE_CONTROL_CRITERIA, ControlRate.DATA_OR_FLOWFILE_RATE);
+        runner.setProperty(ControlRate.TIME_PERIOD, "1 sec");
+        // FlowFile count rate will throttle before data rate
+        runner.setProperty(ControlRate.MAX_DATA_RATE, "44 B"); // greater than all flowfiles to be queued
+        runner.setProperty(ControlRate.MAX_COUNT_RATE, "1");  // limit to 1 flowfile per second
+
+        runner.enqueue("test data 1");
+        runner.enqueue("test data 2");
+        runner.enqueue("test data 3");
+
+        runner.run(4, false);
+
+        runner.assertTransferCount(ControlRate.REL_SUCCESS, 1);
+        runner.assertTransferCount(ControlRate.REL_FAILURE, 0);
+        runner.assertQueueNotEmpty();
+
+        // we have sent 1 flowfile and after 1 second, we should be able to send 1 more
+        Thread.sleep(ONE_SEC_PLUS);
+        runner.run(4, false);
+        runner.assertTransferCount(ControlRate.REL_SUCCESS, 2);
+        runner.assertTransferCount(ControlRate.REL_FAILURE, 0);
+        runner.assertQueueNotEmpty();
+
+        // we have sent 2 flowfile and after 1 second, we should be able to send 1 more
+        Thread.sleep(ONE_SEC_PLUS);
+        runner.run(4, false);
+        runner.assertAllFlowFilesTransferred(ControlRate.REL_SUCCESS, 3);
+        runner.assertTransferCount(ControlRate.REL_FAILURE, 0);
+        runner.assertQueueEmpty();
+    }
+
+    @Test
+    public void testDataOrFlowFileCountLimitedByBytesThenCount() throws InterruptedException {
+        final TestRunner runner = TestRunners.newTestRunner(new ControlRate());
+        runner.setProperty(ControlRate.RATE_CONTROL_CRITERIA, ControlRate.DATA_OR_FLOWFILE_RATE);
+        runner.setProperty(ControlRate.TIME_PERIOD, "1 sec");
+        // Data rate will throttle before FlowFile count
+        runner.setProperty(ControlRate.MAX_DATA_RATE, "22 B");
+        runner.setProperty(ControlRate.MAX_COUNT_RATE, "5");
+
+        runner.enqueue("test data 1");
+        runner.enqueue("test data 2");
+        runner.enqueue("test data 3");
+        runner.enqueue("4");
+        runner.enqueue("5");
+        runner.enqueue("6");
+        runner.enqueue("7");
+        runner.enqueue("8");
+
+        runner.run(10, false);
+
+        runner.assertTransferCount(ControlRate.REL_SUCCESS, 2);
+        runner.assertTransferCount(ControlRate.REL_FAILURE, 0);
+        runner.assertQueueNotEmpty();
+        runner.clearTransferState();
+
+        // we have sent 2 flowfile and after 1 second, we should be able to send more, now limited by flowfile count
+        Thread.sleep(ONE_SEC_PLUS);
+        runner.run(10, false);
+        runner.assertTransferCount(ControlRate.REL_SUCCESS, 5);
+        runner.assertTransferCount(ControlRate.REL_FAILURE, 0);
+        runner.assertQueueNotEmpty();
+        runner.clearTransferState();
+
+        // after 1 second, we should be able to send the remaining flowfile
+        Thread.sleep(ONE_SEC_PLUS);
+        runner.run(10, false);
+        runner.assertAllFlowFilesTransferred(ControlRate.REL_SUCCESS, 1);
+        runner.assertTransferCount(ControlRate.REL_FAILURE, 0);
+        runner.assertQueueEmpty();
+    }
+
+    @Test
+    public void testValidate() {
+        final TestRunner runner = TestRunners.newTestRunner(new ControlRate());
+        runner.setProperty(ControlRate.RATE_CONTROL_CRITERIA, ControlRate.DATA_RATE);
+        runner.assertNotValid(); // MAX_RATE is not set
+        runner.setProperty(ControlRate.MAX_RATE, "1");
+        runner.assertNotValid(); // MAX_RATE is not a byte size
+        runner.setProperty(ControlRate.MAX_RATE, "1 MB");
+        runner.assertValid();
+        runner.setProperty(ControlRate.MAX_DATA_RATE, "1 MB");
+        runner.assertValid(); // MAX_DATA_RATE is ignored
+        runner.removeProperty(ControlRate.MAX_RATE);
+        runner.assertNotValid(); // MAX_RATE is a required property for this rate control criteria
+
+        runner.clearProperties();
+        runner.setProperty(ControlRate.RATE_CONTROL_CRITERIA, ControlRate.FLOWFILE_RATE);
+        runner.assertNotValid(); // MAX_RATE is not set
+        runner.setProperty(ControlRate.MAX_RATE, "1 MB");
+        runner.assertNotValid(); // MAX_RATE is not an integer
+        runner.setProperty(ControlRate.MAX_RATE, "1");
+        runner.assertValid();
+        runner.setProperty(ControlRate.MAX_COUNT_RATE, "1");
+        runner.assertValid(); // MAX_COUNT_RATE is ignored
+        runner.removeProperty(ControlRate.MAX_RATE);
+        runner.assertNotValid(); // MAX_RATE is a required property for this rate control criteria
+
+        runner.clearProperties();
+        runner.setProperty(ControlRate.RATE_CONTROL_CRITERIA, ControlRate.ATTRIBUTE_RATE);
+        runner.setProperty(ControlRate.RATE_CONTROL_ATTRIBUTE_NAME, "count");
+        runner.assertNotValid(); // MAX_RATE is not set
+        runner.setProperty(ControlRate.MAX_RATE, "1 MB");
+        runner.assertNotValid(); // MAX_RATE is not an integer
+        runner.setProperty(ControlRate.MAX_RATE, "1");
+        runner.assertValid();
+        runner.setProperty(ControlRate.MAX_COUNT_RATE, "1");
+        runner.assertValid(); // MAX_COUNT_RATE is ignored
+        runner.removeProperty(ControlRate.MAX_RATE);
+        runner.assertNotValid();// MAX_RATE is a required property for this rate control criteria
+        runner.setProperty(ControlRate.MAX_RATE, "1");
+        runner.removeProperty(ControlRate.RATE_CONTROL_ATTRIBUTE_NAME);
+        runner.assertNotValid();// RATE_CONTROL_ATTRIBUTE_NAME is a required property for this rate control criteria
+
+        runner.clearProperties();
+        runner.setProperty(ControlRate.RATE_CONTROL_CRITERIA, ControlRate.DATA_OR_FLOWFILE_RATE);
+        runner.setProperty(ControlRate.MAX_DATA_RATE, "1 MB");
+        runner.setProperty(ControlRate.MAX_COUNT_RATE, "1");
+        runner.setProperty(ControlRate.MAX_COUNT_RATE, "2");
+        runner.assertValid(); // both MAX_DATA_RATE and MAX_COUNT_RATE are set
+        runner.removeProperty(ControlRate.MAX_COUNT_RATE);
+        runner.assertNotValid(); // MAX_COUNT_RATE is not set
+        runner.setProperty(ControlRate.MAX_COUNT_RATE, "1");
+        runner.removeProperty(ControlRate.MAX_DATA_RATE);
+        runner.assertNotValid();// MAX_DATA_RATE is not set
+        runner.setProperty(ControlRate.MAX_DATA_RATE, "1 MB");
+        runner.setProperty(ControlRate.MAX_RATE, "1 MB");
+        runner.assertValid(); // MAX_RATE is ignored
+    }
+
     private void createFlowFile(final TestRunner runner, final int value) {
         final Map<String, String> attributeMap = new HashMap<>();
         attributeMap.put("count", String.valueOf(value));
-        runner.enqueue(new byte[0], attributeMap);
+        byte[] data = "0123456789".getBytes();
+        runner.enqueue(data, attributeMap);
     }
     private void createFlowFileWithGroup(final TestRunner runner, final String group) {
         final Map<String, String> attributeMap = new HashMap<>();