You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by th...@apache.org on 2022/11/15 04:06:40 UTC
[nifi] branch main updated: NIFI-10243: allow ControlRate to throttle on combination of data rate or flowfile rate
This is an automated email from the ASF dual-hosted git repository.
thenatog pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 2bfefc3e5b NIFI-10243: allow ControlRate to throttle on combination of data rate or flowfile rate
2bfefc3e5b is described below
commit 2bfefc3e5bd126d41181011f9a86701cf3f2c828
Author: Mark Bean <ma...@gmail.com>
AuthorDate: Mon Oct 10 15:00:25 2022 -0400
NIFI-10243: allow ControlRate to throttle on combination of data rate or flowfile rate
NIFI-10243: fix typos
NIFI-10243: re-ordered property in ControlRate
NIFI-10243: minor updates to make code cleaner based on PR comments
Signed-off-by: Nathan Gough <th...@gmail.com>
This closes #6506.
---
.../nifi/processors/standard/ControlRate.java | 306 ++++++++++++++-------
.../additionalDetails.html | 64 +++++
.../nifi/processors/standard/TestControlRate.java | 271 +++++++++++++++++-
3 files changed, 540 insertions(+), 101 deletions(-)
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ControlRate.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ControlRate.java
index 4634f041e1..34b9a8144b 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ControlRate.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ControlRate.java
@@ -16,21 +16,6 @@
*/
package org.apache.nifi.processors.standard;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.ReentrantLock;
-import java.util.regex.Pattern;
-
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.SideEffectFree;
@@ -42,7 +27,6 @@ import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
-import org.apache.nifi.components.Validator;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
@@ -59,6 +43,21 @@ import org.apache.nifi.util.timebuffer.LongEntityAccess;
import org.apache.nifi.util.timebuffer.TimedBuffer;
import org.apache.nifi.util.timebuffer.TimestampedLong;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.regex.Pattern;
+
@SideEffectFree
@TriggerSerially
@InputRequirement(Requirement.INPUT_REQUIRED)
@@ -71,30 +70,58 @@ public class ControlRate extends AbstractProcessor {
public static final String DATA_RATE = "data rate";
public static final String FLOWFILE_RATE = "flowfile count";
public static final String ATTRIBUTE_RATE = "attribute value";
+ public static final String DATA_OR_FLOWFILE_RATE = "data rate or flowfile count";
+
public static final AllowableValue DATA_RATE_VALUE = new AllowableValue(DATA_RATE, DATA_RATE,
"Rate is controlled by counting bytes transferred per time duration.");
public static final AllowableValue FLOWFILE_RATE_VALUE = new AllowableValue(FLOWFILE_RATE, FLOWFILE_RATE,
- "Rate is controlled by counting flowfiles transferred per time duration");
+ "Rate is controlled by counting FlowFiles transferred per time duration");
public static final AllowableValue ATTRIBUTE_RATE_VALUE = new AllowableValue(ATTRIBUTE_RATE, ATTRIBUTE_RATE,
"Rate is controlled by accumulating the value of a specified attribute that is transferred per time duration");
+ public static final AllowableValue DATA_OR_FLOWFILE_RATE_VALUE = new AllowableValue(DATA_OR_FLOWFILE_RATE, DATA_OR_FLOWFILE_RATE,
+ "Rate is controlled by counting bytes and FlowFiles transferred per time duration; if either threshold is met, throttling is enforced");
// based on testing to balance commits and 10,000 FF swap limit
public static final int MAX_FLOW_FILES_PER_BATCH = 1000;
+ private static final long DEFAULT_ACCRUAL_COUNT = -1L;
public static final PropertyDescriptor RATE_CONTROL_CRITERIA = new PropertyDescriptor.Builder()
.name("Rate Control Criteria")
+ .displayName("Rate Control Criteria")
.description("Indicates the criteria that is used to control the throughput rate. Changing this value resets the rate counters.")
.required(true)
- .allowableValues(DATA_RATE_VALUE, FLOWFILE_RATE_VALUE, ATTRIBUTE_RATE_VALUE)
+ .allowableValues(DATA_RATE_VALUE, FLOWFILE_RATE_VALUE, ATTRIBUTE_RATE_VALUE, DATA_OR_FLOWFILE_RATE_VALUE)
.defaultValue(DATA_RATE)
.build();
public static final PropertyDescriptor MAX_RATE = new PropertyDescriptor.Builder()
.name("Maximum Rate")
+ .displayName("Maximum Rate")
.description("The maximum rate at which data should pass through this processor. The format of this property is expected to be a "
+ "positive integer, or a Data Size (such as '1 MB') if Rate Control Criteria is set to 'data rate'.")
- .required(true)
+ .required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) // validated in customValidate b/c dependent on Rate Control Criteria
+ .dependsOn(RATE_CONTROL_CRITERIA, DATA_RATE_VALUE, FLOWFILE_RATE_VALUE, ATTRIBUTE_RATE_VALUE)
+ .build();
+ public static final PropertyDescriptor MAX_DATA_RATE = new PropertyDescriptor.Builder()
+ .name("Maximum Data Rate")
+ .displayName("Maximum Data Rate")
+ .description("The maximum rate at which data should pass through this processor. The format of this property is expected to be a "
+ + "Data Size (such as '1 MB') representing bytes per Time Duration.")
+ .required(false)
+ .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+ .dependsOn(RATE_CONTROL_CRITERIA, DATA_OR_FLOWFILE_RATE)
+ .build();
+
+ public static final PropertyDescriptor MAX_COUNT_RATE = new PropertyDescriptor.Builder()
+ .name("Maximum FlowFile Rate")
+ .displayName("Maximum FlowFile Rate")
+ .description("The maximum rate at which FlowFiles should pass through this processor. The format of this property is expected to be a "
+ + "positive integer representing FlowFiles count per Time Duration")
+ .required(false)
+ .addValidator(StandardValidators.POSITIVE_LONG_VALIDATOR)
+ .dependsOn(RATE_CONTROL_CRITERIA, DATA_OR_FLOWFILE_RATE)
.build();
+
public static final PropertyDescriptor RATE_CONTROL_ATTRIBUTE_NAME = new PropertyDescriptor.Builder()
.name("Rate Controlled Attribute")
.description("The name of an attribute whose values build toward the rate limit if Rate Control Criteria is set to 'attribute value'. "
@@ -103,6 +130,7 @@ public class ControlRate extends AbstractProcessor {
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
+ .dependsOn(RATE_CONTROL_CRITERIA, ATTRIBUTE_RATE)
.build();
public static final PropertyDescriptor TIME_PERIOD = new PropertyDescriptor.Builder()
.name("Time Duration")
@@ -135,11 +163,13 @@ public class ControlRate extends AbstractProcessor {
private List<PropertyDescriptor> properties;
private Set<Relationship> relationships;
- private final ConcurrentMap<String, Throttle> throttleMap = new ConcurrentHashMap<>();
+ private final ConcurrentMap<String, Throttle> dataThrottleMap = new ConcurrentHashMap<>();
+ private final ConcurrentMap<String, Throttle> countThrottleMap = new ConcurrentHashMap<>();
private final AtomicLong lastThrottleClearTime = new AtomicLong(System.currentTimeMillis());
private volatile String rateControlCriteria = null;
private volatile String rateControlAttribute = null;
private volatile String maximumRateStr = null;
+ private volatile String maximumCountRateStr = null;
private volatile String groupingAttributeName = null;
private volatile int timePeriodSeconds = 1;
@@ -147,9 +177,11 @@ public class ControlRate extends AbstractProcessor {
protected void init(final ProcessorInitializationContext context) {
final List<PropertyDescriptor> properties = new ArrayList<>();
properties.add(RATE_CONTROL_CRITERIA);
+ properties.add(TIME_PERIOD);
properties.add(MAX_RATE);
+ properties.add(MAX_DATA_RATE);
+ properties.add(MAX_COUNT_RATE);
properties.add(RATE_CONTROL_ATTRIBUTE_NAME);
- properties.add(TIME_PERIOD);
properties.add(GROUPING_ATTRIBUTE_NAME);
this.properties = Collections.unmodifiableList(properties);
@@ -173,32 +205,32 @@ public class ControlRate extends AbstractProcessor {
protected Collection<ValidationResult> customValidate(final ValidationContext context) {
final List<ValidationResult> validationResults = new ArrayList<>(super.customValidate(context));
- final Validator rateValidator;
switch (context.getProperty(RATE_CONTROL_CRITERIA).getValue().toLowerCase()) {
+ case DATA_OR_FLOWFILE_RATE:
+ // enforce validators to be sure properties are configured; they are only required for DATA_OR_FLOWFILE_RATE criteria
+ validationResults.add(StandardValidators.DATA_SIZE_VALIDATOR.validate(MAX_DATA_RATE.getDisplayName(), context.getProperty(MAX_DATA_RATE).getValue(), context));
+ validationResults.add(StandardValidators.POSITIVE_LONG_VALIDATOR.validate(MAX_COUNT_RATE.getDisplayName(), context.getProperty(MAX_COUNT_RATE).getValue(), context));
+ break;
case DATA_RATE:
- rateValidator = StandardValidators.DATA_SIZE_VALIDATOR;
+ validationResults.add(StandardValidators.DATA_SIZE_VALIDATOR.validate("Maximum Rate", context.getProperty(MAX_RATE).getValue(), context));
break;
+
case ATTRIBUTE_RATE:
- rateValidator = StandardValidators.POSITIVE_LONG_VALIDATOR;
final String rateAttr = context.getProperty(RATE_CONTROL_ATTRIBUTE_NAME).getValue();
if (rateAttr == null) {
validationResults.add(new ValidationResult.Builder()
.subject(RATE_CONTROL_ATTRIBUTE_NAME.getName())
- .explanation("<Rate Controlled Attribute> property must be set if using <Rate Control Criteria> of 'attribute value'")
+ .explanation("property must be set if using <Rate Control Criteria> of 'attribute value'")
.build());
}
- break;
case FLOWFILE_RATE:
+ validationResults.add(StandardValidators.POSITIVE_LONG_VALIDATOR.validate("Maximum Rate", context.getProperty(MAX_RATE).getValue(), context));
+ break;
default:
- rateValidator = StandardValidators.POSITIVE_LONG_VALIDATOR;
+ // no custom validation required
break;
}
- final ValidationResult rateResult = rateValidator.validate("Maximum Rate", context.getProperty(MAX_RATE).getValue(), context);
- if (!rateResult.isValid()) {
- validationResults.add(rateResult);
- }
-
return validationResults;
}
@@ -211,16 +243,37 @@ public class ControlRate extends AbstractProcessor {
|| descriptor.equals(GROUPING_ATTRIBUTE_NAME)
|| descriptor.equals(TIME_PERIOD)) {
// if the criteria that is being used to determine limits/throttles is changed, we must clear our throttle map.
- throttleMap.clear();
- } else if (descriptor.equals(MAX_RATE)) {
+ dataThrottleMap.clear();
+ countThrottleMap.clear();
+ } else if (descriptor.equals(MAX_RATE) || descriptor.equals(MAX_DATA_RATE)) {
+ // MAX_RATE could affect either throttle map; MAX_DATA_RATE only affects data throttle map
final long newRate;
- if (DataUnit.DATA_SIZE_PATTERN.matcher(newValue.toUpperCase()).matches()) {
- newRate = DataUnit.parseDataSize(newValue, DataUnit.B).longValue();
- } else {
+ if (newValue != null) {
+ if (DataUnit.DATA_SIZE_PATTERN.matcher(newValue.toUpperCase()).matches()) {
+ newRate = DataUnit.parseDataSize(newValue, DataUnit.B).longValue();
+ } else {
+ newRate = Long.parseLong(newValue);
+ }
+ if (dataThrottleRequired()) {
+ for (final Throttle throttle : dataThrottleMap.values()) {
+ throttle.setMaxRate(newRate);
+ }
+ }
+ if (countThrottleRequired()) {
+ for (final Throttle throttle : countThrottleMap.values()) {
+ throttle.setMaxRate(newRate);
+ }
+ }
+ }
+ } else if (descriptor.equals(MAX_COUNT_RATE)) {
+ // MAX_COUNT_RATE only affects count throttle map
+ long newRate;
+ try {
newRate = Long.parseLong(newValue);
+ } catch (NumberFormatException nfe) {
+ newRate = -1;
}
-
- for (final Throttle throttle : throttleMap.values()) {
+ for (final Throttle throttle : countThrottleMap.values()) {
throttle.setMaxRate(newRate);
}
}
@@ -230,7 +283,16 @@ public class ControlRate extends AbstractProcessor {
public void onScheduled(final ProcessContext context) {
rateControlCriteria = context.getProperty(RATE_CONTROL_CRITERIA).getValue().toLowerCase();
rateControlAttribute = context.getProperty(RATE_CONTROL_ATTRIBUTE_NAME).getValue();
- maximumRateStr = context.getProperty(MAX_RATE).getValue().toUpperCase();
+ if (dataThrottleRequired()) {
+ // Use MAX_DATA_RATE only for DATA_OR_FLOWFILE_RATE criteria
+ maximumRateStr = rateControlCriteria.equals(DATA_OR_FLOWFILE_RATE)
+ ? context.getProperty(MAX_DATA_RATE).getValue().toUpperCase() : context.getProperty(MAX_RATE).getValue().toUpperCase();
+ }
+ if (countThrottleRequired()) {
+ // Use MAX_COUNT_RATE only for DATA_OR_FLOWFILE_RATE criteria
+ maximumCountRateStr = rateControlCriteria.equals(DATA_OR_FLOWFILE_RATE)
+ ? context.getProperty(MAX_COUNT_RATE).getValue() : context.getProperty(MAX_RATE).getValue();
+ }
groupingAttributeName = context.getProperty(GROUPING_ATTRIBUTE_NAME).getValue();
timePeriodSeconds = context.getProperty(TIME_PERIOD).asTimePeriod(TimeUnit.SECONDS).intValue();
}
@@ -248,7 +310,14 @@ public class ControlRate extends AbstractProcessor {
final long throttleExpirationMillis = System.currentTimeMillis() - 2 * context.getProperty(TIME_PERIOD).asTimePeriod(TimeUnit.MILLISECONDS);
if (lastClearTime < throttleExpirationMillis) {
if (lastThrottleClearTime.compareAndSet(lastClearTime, System.currentTimeMillis())) {
- final Iterator<Map.Entry<String, Throttle>> itr = throttleMap.entrySet().iterator();
+ final Set<Map.Entry<String, Throttle>> throttleSet = new HashSet<>();
+ if (dataThrottleRequired()) {
+ throttleSet.addAll(dataThrottleMap.entrySet());
+ }
+ if (countThrottleRequired()) {
+ throttleSet.addAll(countThrottleMap.entrySet());
+ }
+ final Iterator<Map.Entry<String, Throttle>> itr = throttleSet.iterator();
while (itr.hasNext()) {
final Map.Entry<String, Throttle> entry = itr.next();
final Throttle throttle = entry.getValue();
@@ -268,48 +337,67 @@ public class ControlRate extends AbstractProcessor {
final ComponentLog logger = getLogger();
for (FlowFile flowFile : flowFiles) {
// call this to capture potential error
- final long accrualAmount = getFlowFileAccrual(flowFile);
- if (accrualAmount < 0) {
- logger.error("Routing {} to 'failure' due to missing or invalid attribute", new Object[]{flowFile});
+ if (!isAccrualPossible(flowFile)) {
+ logger.error("Routing {} to 'failure' due to missing or invalid attribute", flowFile);
session.transfer(flowFile, REL_FAILURE);
} else {
- logger.info("transferring {} to 'success'", new Object[]{flowFile});
+ logger.info("transferring {} to 'success'", flowFile);
session.transfer(flowFile, REL_SUCCESS);
}
}
}
+ /*
+ * Determine if the accrual amount is valid for the type of throttle being applied. For example, if throttling based on
+ * flowfile attribute, the specified attribute must be present and must be a long integer.
+ */
+ private boolean isAccrualPossible(FlowFile flowFile) {
+ if (rateControlCriteria.equals(ATTRIBUTE_RATE)) {
+ final String attributeValue = flowFile.getAttribute(rateControlAttribute);
+ return attributeValue != null && POSITIVE_LONG_PATTERN.matcher(attributeValue).matches();
+ }
+ return true;
+ }
+
/*
* Determine the amount this FlowFile will incur against the maximum allowed rate.
- * If the value returned is negative then the flowfile given is missing the required attribute
- * or the attribute has an invalid value for accrual.
+ * This is applicable to data size accrual only
*/
- private long getFlowFileAccrual(FlowFile flowFile) {
- long rateValue;
- switch (rateControlCriteria) {
- case DATA_RATE:
- rateValue = flowFile.getSize();
- break;
- case FLOWFILE_RATE:
- rateValue = 1;
- break;
- case ATTRIBUTE_RATE:
- final String attributeValue = flowFile.getAttribute(rateControlAttribute);
- if (attributeValue == null) {
- return -1L;
- }
+ private long getDataSizeAccrual(FlowFile flowFile) {
+ return flowFile.getSize();
+ }
- if (!POSITIVE_LONG_PATTERN.matcher(attributeValue).matches()) {
- return -1L;
- }
- rateValue = Long.parseLong(attributeValue);
- break;
- default:
- throw new AssertionError("<Rate Control Criteria> property set to illegal value of " + rateControlCriteria);
+ /*
+ * Determine the amount this FlowFile will incur against the maximum allowed rate.
+ * This is applicable to counting accruals, flowfiles or attributes
+ */
+ private long getCountAccrual(FlowFile flowFile) {
+ long rateValue = DEFAULT_ACCRUAL_COUNT;
+ if (rateControlCriteria.equals(FLOWFILE_RATE) || rateControlCriteria.equals(DATA_OR_FLOWFILE_RATE)) {
+ rateValue = 1;
+ }
+ if (rateControlCriteria.equals(ATTRIBUTE_RATE)) {
+ final String attributeValue = flowFile.getAttribute(rateControlAttribute);
+ if (attributeValue == null) {
+ return DEFAULT_ACCRUAL_COUNT;
+ }
+
+ if (!POSITIVE_LONG_PATTERN.matcher(attributeValue).matches()) {
+ return DEFAULT_ACCRUAL_COUNT;
+ }
+ rateValue = Long.parseLong(attributeValue);
}
return rateValue;
}
+ private boolean dataThrottleRequired() {
+ return rateControlCriteria != null && (rateControlCriteria.equals(DATA_RATE) || rateControlCriteria.equals(DATA_OR_FLOWFILE_RATE));
+ }
+
+ private boolean countThrottleRequired() {
+ return rateControlCriteria != null && (rateControlCriteria.equals(FLOWFILE_RATE) || rateControlCriteria.equals(ATTRIBUTE_RATE) || rateControlCriteria.equals(DATA_OR_FLOWFILE_RATE));
+ }
+
private static class Throttle extends ReentrantLock {
private final AtomicLong maxRate = new AtomicLong(1L);
@@ -336,6 +424,10 @@ public class ControlRate extends AbstractProcessor {
}
public boolean tryAdd(final long value) {
+ // value should never be negative, but if it is return immediately
+ if (value < 0) {
+ return false;
+ }
final long now = System.currentTimeMillis();
if (penalizationExpired > now) {
return false;
@@ -346,7 +438,7 @@ public class ControlRate extends AbstractProcessor {
final TimestampedLong sum = timedBuffer.getAggregateValue(timePeriodMillis);
if (sum != null && sum.getValue() >= maxRateValue) {
if (logger.isDebugEnabled()) {
- logger.debug("current sum for throttle is {} at time {}, so not allowing rate of {} through", new Object[]{sum.getValue(), sum.getTimestamp(), value});
+ logger.debug("current sum for throttle is {} at time {}, so not allowing rate of {} through", sum.getValue(), sum.getTimestamp(), value);
}
return false;
}
@@ -354,7 +446,7 @@ public class ControlRate extends AbstractProcessor {
// Implement the Throttle penalization based on how much extra 'amountOver' was allowed through
if (penalizationPeriod > 0) {
if (logger.isDebugEnabled()) {
- logger.debug("Starting Throttle penalization, expiring {} milliseconds from now", new Object[]{penalizationPeriod});
+ logger.debug("Starting Throttle penalization, expiring {} milliseconds from now", penalizationPeriod);
}
penalizationExpired = now + penalizationPeriod;
penalizationPeriod = 0;
@@ -363,7 +455,7 @@ public class ControlRate extends AbstractProcessor {
if (logger.isDebugEnabled()) {
logger.debug("current sum for throttle is {} at time {}, so allowing rate of {} through",
- new Object[]{sum == null ? 0 : sum.getValue(), sum == null ? 0 : sum.getTimestamp(), value});
+ sum == null ? 0 : sum.getValue(), sum == null ? 0 : sum.getTimestamp(), value);
}
final long transferred = timedBuffer.add(new TimestampedLong(value)).getValue();
@@ -374,7 +466,7 @@ public class ControlRate extends AbstractProcessor {
this.penalizationPeriod = (long) (timePeriodMillis * pct);
if (logger.isDebugEnabled()) {
- logger.debug("allowing rate of {} through but penalizing Throttle for {} milliseconds", new Object[]{value, penalizationPeriod});
+ logger.debug("allowing rate of {} through but penalizing Throttle for {} milliseconds", value, penalizationPeriod);
}
}
@@ -394,8 +486,7 @@ public class ControlRate extends AbstractProcessor {
@Override
public FlowFileFilterResult filter(FlowFile flowFile) {
- long accrual = getFlowFileAccrual(flowFile);
- if (accrual < 0) {
+ if (!isAccrualPossible(flowFile)) {
// this FlowFile is invalid for this configuration so let the processor deal with it
return FlowFileFilterResult.ACCEPT_AND_TERMINATE;
}
@@ -408,34 +499,59 @@ public class ControlRate extends AbstractProcessor {
groupName = DEFAULT_GROUP_ATTRIBUTE;
}
- Throttle throttle = throttleMap.get(groupName);
- if (throttle == null) {
- throttle = new Throttle(timePeriodSeconds, TimeUnit.SECONDS, getLogger());
+ Throttle dataThrottle = dataThrottleMap.get(groupName);
+ Throttle countThrottle = countThrottleMap.get(groupName);
- final long newRate;
- if (DataUnit.DATA_SIZE_PATTERN.matcher(maximumRateStr).matches()) {
- newRate = DataUnit.parseDataSize(maximumRateStr, DataUnit.B).longValue();
- } else {
- newRate = Long.parseLong(maximumRateStr);
+ boolean dataThrottlingActive = false;
+ if (dataThrottleRequired()) {
+ if (dataThrottle == null) {
+ dataThrottle = new Throttle(timePeriodSeconds, TimeUnit.SECONDS, getLogger());
+ dataThrottle.setMaxRate(DataUnit.parseDataSize(maximumRateStr, DataUnit.B).longValue());
+ dataThrottleMap.put(groupName, dataThrottle);
}
- throttle.setMaxRate(newRate);
- throttleMap.put(groupName, throttle);
+ dataThrottle.lock();
+ try {
+ if (dataThrottle.tryAdd(getDataSizeAccrual(flowFile))) {
+ flowFilesInBatch++;
+ if (flowFilesInBatch>= flowFilesPerBatch) {
+ flowFilesInBatch = 0;
+ return FlowFileFilterResult.ACCEPT_AND_TERMINATE;
+ } else {
+ // only accept flowfile if additional count throttle does not need to run
+ if (!countThrottleRequired()) {
+ return FlowFileFilterResult.ACCEPT_AND_CONTINUE;
+ }
+ }
+ } else {
+ dataThrottlingActive = true;
+ }
+ } finally {
+ dataThrottle.unlock();
+ }
}
- throttle.lock();
- try {
- if (throttle.tryAdd(accrual)) {
- flowFilesInBatch += 1;
- if (flowFilesInBatch>= flowFilesPerBatch) {
- flowFilesInBatch = 0;
- return FlowFileFilterResult.ACCEPT_AND_TERMINATE;
- } else {
- return FlowFileFilterResult.ACCEPT_AND_CONTINUE;
+ // continue processing count throttle only if required and if data throttle is not already limiting flowfiles
+ if (countThrottleRequired() && !dataThrottlingActive) {
+ if (countThrottle == null) {
+ countThrottle = new Throttle(timePeriodSeconds, TimeUnit.SECONDS, getLogger());
+ countThrottle.setMaxRate(Long.parseLong(maximumCountRateStr));
+ countThrottleMap.put(groupName, countThrottle);
+ }
+ countThrottle.lock();
+ try {
+ if (countThrottle.tryAdd(getCountAccrual(flowFile))) {
+ flowFilesInBatch++;
+ if (flowFilesInBatch>= flowFilesPerBatch) {
+ flowFilesInBatch = 0;
+ return FlowFileFilterResult.ACCEPT_AND_TERMINATE;
+ } else {
+ return FlowFileFilterResult.ACCEPT_AND_CONTINUE;
+ }
}
+ } finally {
+ countThrottle.unlock();
}
- } finally {
- throttle.unlock();
}
// If we are not using a grouping attribute, then no FlowFile will be able to continue on. So we can
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.ControlRate/additionalDetails.html b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.ControlRate/additionalDetails.html
new file mode 100644
index 0000000000..7c8de2c7f5
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.ControlRate/additionalDetails.html
@@ -0,0 +1,64 @@
+<!DOCTYPE html>
+<html lang="en">
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<head>
+ <meta charset="utf-8" />
+ <title>ControlRate</title>
+
+ <link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css" />
+</head>
+<body>
+ <p>This processor throttles throughput of FlowFiles based on a configured rate. The rate can be specified as either a direct data rate (bytes per time period), or by
+ counting FlowFiles or a specific attribute value. In all cases, the time period for measurement is specified in the Time Duration property.
+ </p>
+ <p>The processor operates in one of four available modes. The mode is determined by the Rate Control Criteria property.
+ </p>
+ <p>
+ <table>
+ <tr>
+ <th>Mode</th>
+ <th>Description</th>
+ </tr>
+ <tr>
+ <td>Data Rate</td>
+ <td>The FlowFile content size is accumulated for all FlowFiles passing through this processor. FlowFiles are throttled to ensure a maximum overall data rate (bytes per time period)
+ is not exceeded. The Maximum Rate property specifies the maximum bytes allowed per Time Duration.</td>
+ </tr>
+ <tr>
+ <td>FlowFile Count</td>
+ <td>FlowFiles are counted regardless of content size. No more than the specified number of FlowFiles pass through this processor in the given Time Duration. The Maximum Rate property
+ specifies the maximum number of FlowFiles allowed per Time Duration.</td>
+ </tr>
+ <tr>
+ <td>Attribute Value</td>
+ <td>The value of an attribute is accumulated to determine overall rate. The Rate Controlled Attribute property specifies the attribute whose value will be accumulated. The value of
+ the specified attribute is expected to be an integer. This mode is independent of overall FlowFile size and count.</td>
+ </tr>
+ <tr>
+ <td>Data Rate or FlowFile Count</td>
+ <td>This mode provides a combination of Data Rate and FlowFile Count. Both rates are accumulated and FlowFiles are throttled if either rate is exceeded. Both Maximum Data Rate and
+ Maximum FlowFile Rate properties must be specified to determine content size and FlowFile count per Time Duration.</td>
+ </tr>
+ </table>
+ </p>
+ <p>If the Grouping Attribute property is specified, all rates are accumulated separately for unique values of the specified attribute. For example, assume Grouping Attribute property is
+ specified and the its value is "city". All FlowFiles containing a "city" attribute with value "Albuquerque" will have an accumulated rate calculated. A separate rate will be calculated
+ for all FlowFiles containing a "city" attribute with a value "Boston". In other words, separate rate calculations will be accumulated for all unique values of the Grouping Attribute.
+ </p>
+
+
+</body>
+</html>
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestControlRate.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestControlRate.java
index fbecd767c4..0b68022622 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestControlRate.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestControlRate.java
@@ -31,6 +31,8 @@ import org.junit.jupiter.api.Test;
public class TestControlRate {
+ private static final long ONE_SEC_PLUS = 1010L;
+
@Test
public void testLimitExceededThenOtherLimitNotExceeded() {
// If we have flowfiles queued that have different values for the "Rate Controlled Attribute"
@@ -84,7 +86,7 @@ public class TestControlRate {
runner.assertQueueNotEmpty();
// we have sent 3 files and after 1 second, we should be able to send the 4th
- Thread.sleep(1100L);
+ Thread.sleep(ONE_SEC_PLUS);
runner.run();
runner.assertAllFlowFilesTransferred(ControlRate.REL_SUCCESS, 1);
runner.assertQueueEmpty();
@@ -116,7 +118,7 @@ public class TestControlRate {
runner.assertQueueNotEmpty();
// we have sent 2 files per group and after 1 second, we should be able to send the remaining 1 file per group
- Thread.sleep(1100L);
+ Thread.sleep(ONE_SEC_PLUS);
runner.run(2);
runner.assertAllFlowFilesTransferred(ControlRate.REL_SUCCESS, 2);
runner.assertQueueEmpty();
@@ -145,7 +147,7 @@ public class TestControlRate {
runner.assertQueueNotEmpty();
// we have sent 20 bytes and after 1 second, we should be able to send 20 more
- Thread.sleep(1100L);
+ Thread.sleep(ONE_SEC_PLUS);
runner.run(2, false);
runner.assertAllFlowFilesTransferred(ControlRate.REL_SUCCESS, 2);
runner.assertQueueEmpty();
@@ -192,6 +194,28 @@ public class TestControlRate {
runner.assertQueueEmpty();
}
+ @Test
+ public void testAttributeDoesNotExist() throws InterruptedException {
+ final TestRunner runner = TestRunners.newTestRunner(new ControlRate());
+ runner.setProperty(ControlRate.RATE_CONTROL_CRITERIA, ControlRate.ATTRIBUTE_RATE);
+ runner.setProperty(ControlRate.RATE_CONTROL_ATTRIBUTE_NAME, "no.such.attribute");
+ runner.setProperty(ControlRate.MAX_RATE, "20000");
+ runner.setProperty(ControlRate.TIME_PERIOD, "1 sec");
+
+ createFlowFile(runner, 1000);
+ createFlowFile(runner, 3000);
+ createFlowFile(runner, 5000);
+ createFlowFile(runner, 20000);
+ createFlowFile(runner, 1000);
+
+ runner.run(5, false);
+
+ // all flowfiles transfer to failure since throttling attribute is not present
+ runner.assertAllFlowFilesTransferred(ControlRate.REL_FAILURE, 5);
+ runner.assertTransferCount(ControlRate.REL_SUCCESS, 0);
+ runner.assertQueueEmpty();
+ }
+
@Test
public void testBadAttributeRate() {
final TestRunner runner = TestRunners.newTestRunner(new ControlRate());
@@ -211,7 +235,7 @@ public class TestControlRate {
}
@Test
- public void testBatchLimit() throws InterruptedException {
+ public void testBatchLimit() {
final TestRunner runner = TestRunners.newTestRunner(new ControlRate());
runner.setProperty(ControlRate.RATE_CONTROL_CRITERIA, ControlRate.FLOWFILE_RATE);
runner.setProperty(ControlRate.MAX_RATE, "5555");
@@ -240,7 +264,7 @@ public class TestControlRate {
}
@Test
- public void testNonExistingGroupAttribute() throws InterruptedException {
+ public void testNonExistingGroupAttribute() {
final TestRunner runner = TestRunners.newTestRunner(new ControlRate());
runner.setProperty(ControlRate.RATE_CONTROL_CRITERIA, ControlRate.FLOWFILE_RATE);
runner.setProperty(ControlRate.MAX_RATE, "2");
@@ -258,10 +282,245 @@ public class TestControlRate {
runner.assertQueueEmpty();
}
+ @Test
+ public void testIncreaseDataRate() throws InterruptedException {
+ final TestRunner runner = TestRunners.newTestRunner(new ControlRate());
+ runner.setProperty(ControlRate.RATE_CONTROL_CRITERIA, ControlRate.DATA_RATE);
+ runner.setProperty(ControlRate.MAX_RATE, "11 B");
+ runner.setProperty(ControlRate.TIME_PERIOD, "1 sec");
+
+ runner.enqueue("test data 1");
+ runner.enqueue("test data 2");
+ runner.enqueue("test data 3");
+ runner.enqueue("test data 4");
+ runner.enqueue("test data 5");
+ runner.enqueue("test data 6");
+
+ runner.run(7, true);
+
+ runner.assertTransferCount(ControlRate.REL_SUCCESS, 1);
+ runner.assertTransferCount(ControlRate.REL_FAILURE, 0);
+ runner.assertQueueNotEmpty();
+
+ // Increase rate after stopping processor. Previous count should remain since we are still inside time period
+ runner.setProperty(ControlRate.MAX_RATE, "33 B");
+ runner.run(7, false);
+ runner.assertTransferCount(ControlRate.REL_SUCCESS, 3);
+ runner.assertTransferCount(ControlRate.REL_FAILURE, 0);
+ runner.assertQueueNotEmpty();
+
+ // after 1 second, we should be able to send the up to 3 more flowfiles
+ Thread.sleep(ONE_SEC_PLUS);
+ runner.run(7, false);
+ runner.assertTransferCount(ControlRate.REL_SUCCESS, 6);
+ runner.assertTransferCount(ControlRate.REL_FAILURE, 0);
+ runner.assertQueueEmpty();
+ }
+
+ @Test
+ public void testIncreaseFlowFileRate() throws InterruptedException {
+ final TestRunner runner = TestRunners.newTestRunner(new ControlRate());
+ runner.setProperty(ControlRate.RATE_CONTROL_CRITERIA, ControlRate.FLOWFILE_RATE);
+ runner.setProperty(ControlRate.MAX_RATE, "1");
+ runner.setProperty(ControlRate.TIME_PERIOD, "1 sec");
+
+ runner.enqueue("test data 1");
+ runner.enqueue("test data 2");
+ runner.enqueue("test data 3");
+ runner.enqueue("test data 4");
+ runner.enqueue("test data 5");
+ runner.enqueue("test data 6");
+
+ runner.run(7, true);
+
+ runner.assertTransferCount(ControlRate.REL_SUCCESS, 1);
+ runner.assertTransferCount(ControlRate.REL_FAILURE, 0);
+ runner.assertQueueNotEmpty();
+
+ // Increase rate after stopping processor. Previous count should remain since we are still inside time period
+ runner.setProperty(ControlRate.MAX_RATE, "3");
+ runner.run(7, false);
+ runner.assertTransferCount(ControlRate.REL_SUCCESS, 3);
+ runner.assertTransferCount(ControlRate.REL_FAILURE, 0);
+ runner.assertQueueNotEmpty();
+
+ // after 1 second, we should be able to send the up to 3 more flowfiles
+ Thread.sleep(ONE_SEC_PLUS);
+ runner.run(7, false);
+ runner.assertTransferCount(ControlRate.REL_SUCCESS, 6);
+ runner.assertTransferCount(ControlRate.REL_FAILURE, 0);
+ runner.assertQueueEmpty();
+ }
+
+ @Test
+ public void testDataOrFlowFileCountLimitedByBytes() throws InterruptedException {
+ final TestRunner runner = TestRunners.newTestRunner(new ControlRate());
+ runner.setProperty(ControlRate.RATE_CONTROL_CRITERIA, ControlRate.DATA_OR_FLOWFILE_RATE);
+ runner.setProperty(ControlRate.TIME_PERIOD, "1 sec");
+ // Data rate will throttle before FlowFile count
+ runner.setProperty(ControlRate.MAX_DATA_RATE, "22 B");
+ runner.setProperty(ControlRate.MAX_COUNT_RATE, "3");
+
+ runner.enqueue("test data 1");
+ runner.enqueue("test data 2");
+ runner.enqueue("test data 3");
+
+ runner.run(4, false);
+
+ runner.assertAllFlowFilesTransferred(ControlRate.REL_SUCCESS, 2);
+ runner.assertTransferCount(ControlRate.REL_FAILURE, 0);
+ runner.assertQueueNotEmpty();
+ runner.clearTransferState();
+
+ runner.run(4, false);
+ runner.assertTransferCount(ControlRate.REL_SUCCESS, 0);
+ runner.assertTransferCount(ControlRate.REL_FAILURE, 0);
+ runner.assertQueueNotEmpty();
+ // we have sent 22 bytes and after 1 second, we should be able to send 22 more
+ Thread.sleep(ONE_SEC_PLUS);
+ runner.run(4, false);
+ runner.assertAllFlowFilesTransferred(ControlRate.REL_SUCCESS, 1);
+ runner.assertQueueEmpty();
+ }
+
+ @Test
+ public void testDataOrFlowFileCountLimitedByCount() throws InterruptedException {
+ final TestRunner runner = TestRunners.newTestRunner(new ControlRate());
+ runner.setProperty(ControlRate.RATE_CONTROL_CRITERIA, ControlRate.DATA_OR_FLOWFILE_RATE);
+ runner.setProperty(ControlRate.TIME_PERIOD, "1 sec");
+ // FlowFile count rate will throttle before data rate
+ runner.setProperty(ControlRate.MAX_DATA_RATE, "44 B"); // greater than all flowfiles to be queued
+ runner.setProperty(ControlRate.MAX_COUNT_RATE, "1"); // limit to 1 flowfile per second
+
+ runner.enqueue("test data 1");
+ runner.enqueue("test data 2");
+ runner.enqueue("test data 3");
+
+ runner.run(4, false);
+
+ runner.assertTransferCount(ControlRate.REL_SUCCESS, 1);
+ runner.assertTransferCount(ControlRate.REL_FAILURE, 0);
+ runner.assertQueueNotEmpty();
+
+ // we have sent 1 flowfile and after 1 second, we should be able to send 1 more
+ Thread.sleep(ONE_SEC_PLUS);
+ runner.run(4, false);
+ runner.assertTransferCount(ControlRate.REL_SUCCESS, 2);
+ runner.assertTransferCount(ControlRate.REL_FAILURE, 0);
+ runner.assertQueueNotEmpty();
+
+ // we have sent 2 flowfile and after 1 second, we should be able to send 1 more
+ Thread.sleep(ONE_SEC_PLUS);
+ runner.run(4, false);
+ runner.assertAllFlowFilesTransferred(ControlRate.REL_SUCCESS, 3);
+ runner.assertTransferCount(ControlRate.REL_FAILURE, 0);
+ runner.assertQueueEmpty();
+ }
+
+ @Test
+ public void testDataOrFlowFileCountLimitedByBytesThenCount() throws InterruptedException {
+ final TestRunner runner = TestRunners.newTestRunner(new ControlRate());
+ runner.setProperty(ControlRate.RATE_CONTROL_CRITERIA, ControlRate.DATA_OR_FLOWFILE_RATE);
+ runner.setProperty(ControlRate.TIME_PERIOD, "1 sec");
+ // Data rate will throttle before FlowFile count
+ runner.setProperty(ControlRate.MAX_DATA_RATE, "22 B");
+ runner.setProperty(ControlRate.MAX_COUNT_RATE, "5");
+
+ runner.enqueue("test data 1");
+ runner.enqueue("test data 2");
+ runner.enqueue("test data 3");
+ runner.enqueue("4");
+ runner.enqueue("5");
+ runner.enqueue("6");
+ runner.enqueue("7");
+ runner.enqueue("8");
+
+ runner.run(10, false);
+
+ runner.assertTransferCount(ControlRate.REL_SUCCESS, 2);
+ runner.assertTransferCount(ControlRate.REL_FAILURE, 0);
+ runner.assertQueueNotEmpty();
+ runner.clearTransferState();
+
+ // we have sent 2 flowfile and after 1 second, we should be able to send more, now limited by flowfile count
+ Thread.sleep(ONE_SEC_PLUS);
+ runner.run(10, false);
+ runner.assertTransferCount(ControlRate.REL_SUCCESS, 5);
+ runner.assertTransferCount(ControlRate.REL_FAILURE, 0);
+ runner.assertQueueNotEmpty();
+ runner.clearTransferState();
+
+ // after 1 second, we should be able to send the remaining flowfile
+ Thread.sleep(ONE_SEC_PLUS);
+ runner.run(10, false);
+ runner.assertAllFlowFilesTransferred(ControlRate.REL_SUCCESS, 1);
+ runner.assertTransferCount(ControlRate.REL_FAILURE, 0);
+ runner.assertQueueEmpty();
+ }
+
+ @Test
+ public void testValidate() {
+ final TestRunner runner = TestRunners.newTestRunner(new ControlRate());
+ runner.setProperty(ControlRate.RATE_CONTROL_CRITERIA, ControlRate.DATA_RATE);
+ runner.assertNotValid(); // MAX_RATE is not set
+ runner.setProperty(ControlRate.MAX_RATE, "1");
+ runner.assertNotValid(); // MAX_RATE is not a byte size
+ runner.setProperty(ControlRate.MAX_RATE, "1 MB");
+ runner.assertValid();
+ runner.setProperty(ControlRate.MAX_DATA_RATE, "1 MB");
+ runner.assertValid(); // MAX_DATA_RATE is ignored
+ runner.removeProperty(ControlRate.MAX_RATE);
+ runner.assertNotValid(); // MAX_RATE is a required property for this rate control criteria
+
+ runner.clearProperties();
+ runner.setProperty(ControlRate.RATE_CONTROL_CRITERIA, ControlRate.FLOWFILE_RATE);
+ runner.assertNotValid(); // MAX_RATE is not set
+ runner.setProperty(ControlRate.MAX_RATE, "1 MB");
+ runner.assertNotValid(); // MAX_RATE is not an integer
+ runner.setProperty(ControlRate.MAX_RATE, "1");
+ runner.assertValid();
+ runner.setProperty(ControlRate.MAX_COUNT_RATE, "1");
+ runner.assertValid(); // MAX_COUNT_RATE is ignored
+ runner.removeProperty(ControlRate.MAX_RATE);
+ runner.assertNotValid(); // MAX_RATE is a required property for this rate control criteria
+
+ runner.clearProperties();
+ runner.setProperty(ControlRate.RATE_CONTROL_CRITERIA, ControlRate.ATTRIBUTE_RATE);
+ runner.setProperty(ControlRate.RATE_CONTROL_ATTRIBUTE_NAME, "count");
+ runner.assertNotValid(); // MAX_RATE is not set
+ runner.setProperty(ControlRate.MAX_RATE, "1 MB");
+ runner.assertNotValid(); // MAX_RATE is not an integer
+ runner.setProperty(ControlRate.MAX_RATE, "1");
+ runner.assertValid();
+ runner.setProperty(ControlRate.MAX_COUNT_RATE, "1");
+ runner.assertValid(); // MAX_COUNT_RATE is ignored
+ runner.removeProperty(ControlRate.MAX_RATE);
+ runner.assertNotValid();// MAX_RATE is a required property for this rate control criteria
+ runner.setProperty(ControlRate.MAX_RATE, "1");
+ runner.removeProperty(ControlRate.RATE_CONTROL_ATTRIBUTE_NAME);
+ runner.assertNotValid();// RATE_CONTROL_ATTRIBUTE_NAME is a required property for this rate control criteria
+
+ runner.clearProperties();
+ runner.setProperty(ControlRate.RATE_CONTROL_CRITERIA, ControlRate.DATA_OR_FLOWFILE_RATE);
+ runner.setProperty(ControlRate.MAX_DATA_RATE, "1 MB");
+ runner.setProperty(ControlRate.MAX_COUNT_RATE, "1");
+ runner.setProperty(ControlRate.MAX_COUNT_RATE, "2");
+ runner.assertValid(); // both MAX_DATA_RATE and MAX_COUNT_RATE are set
+ runner.removeProperty(ControlRate.MAX_COUNT_RATE);
+ runner.assertNotValid(); // MAX_COUNT_RATE is not set
+ runner.setProperty(ControlRate.MAX_COUNT_RATE, "1");
+ runner.removeProperty(ControlRate.MAX_DATA_RATE);
+ runner.assertNotValid();// MAX_DATA_RATE is not set
+ runner.setProperty(ControlRate.MAX_DATA_RATE, "1 MB");
+ runner.setProperty(ControlRate.MAX_RATE, "1 MB");
+ runner.assertValid(); // MAX_RATE is ignored
+ }
+
private void createFlowFile(final TestRunner runner, final int value) {
final Map<String, String> attributeMap = new HashMap<>();
attributeMap.put("count", String.valueOf(value));
- runner.enqueue(new byte[0], attributeMap);
+ byte[] data = "0123456789".getBytes();
+ runner.enqueue(data, attributeMap);
}
private void createFlowFileWithGroup(final TestRunner runner, final String group) {
final Map<String, String> attributeMap = new HashMap<>();