You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by GitBox <gi...@apache.org> on 2022/10/10 21:42:46 UTC

[GitHub] [nifi] markobean opened a new pull request, #6506: NIFI-10243: allow ControlRate to throttle on combination of data rate or flowfile rate

markobean opened a new pull request, #6506:
URL: https://github.com/apache/nifi/pull/6506

   # Summary
   This PR modifies ControlRate processor to introduce a new mode of operation. Rate throttling can now be done by both Data Rate and FlowFile Rate. The first rate to be reached begins throttling. Documentation was improved and unit testing expanded to cover additional cases. Backward compatibility is maintained.
   
   [NIFI-10243](https://issues.apache.org/jira/browse/NIFI-10243)
   
   # Tracking
   
   Please complete the following tracking steps prior to pull request creation.
   
   ### Issue Tracking
   
   - [X] [Apache NiFi Jira](https://issues.apache.org/jira/browse/NIFI) issue created
   
   ### Pull Request Tracking
   
   - [X] Pull Request title starts with Apache NiFi Jira issue number, such as `NIFI-00000`
   - [X] Pull Request commit message starts with Apache NiFi Jira issue number, as such `NIFI-00000`
   
   ### Pull Request Formatting
   
   - [X] Pull Request based on current revision of the `main` branch
   - [X] Pull Request refers to a feature branch with one commit containing changes
   
   # Verification
   
   Unit tests expanded to cover additional cases - both new modes of operation and older modes that were not adequately tested.
   
   Instantiated processor in a running NiFi to perform real world testing.
   
   Viewed documentation including new Additional Details page to verify proper representation.
   
   ### Build
   
   - [X] Build completed using `mvn clean install -P contrib-check`
     - [X] JDK 8
     - [X ] JDK 11
     - [X] JDK 17
   
   ### Licensing
   
   - [ ] New dependencies are compatible with the [Apache License 2.0](https://apache.org/licenses/LICENSE-2.0) according to the [License Policy](https://www.apache.org/legal/resolved.html)
   - [ ] New dependencies are documented in applicable `LICENSE` and `NOTICE` files
   
   ### Documentation
   
   - [X] Documentation formatting appears as expected in rendered files
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] exceptionfactory commented on pull request #6506: NIFI-10243: allow ControlRate to throttle on combination of data rate or flowfile rate

Posted by GitBox <gi...@apache.org>.
exceptionfactory commented on PR #6506:
URL: https://github.com/apache/nifi/pull/6506#issuecomment-1309629082

   Thanks for making the updates and noting the detail about the minimum validation setting @markobean, the changes look good.
   
   The new test method push the total test time to over 12 seconds for standard execution, which will place it among the slower unit tests, contributing to an overall slow build process. I have an idea for updating the unit test to extend ControlRate for the test class and implement an alternative current time method. With that background, I'm not opposed to moving forward with this PR, and I can evaluate an improvement in a subsequent Jira issue.
   
   I will defer to @thenatog for final review at this point, thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] thenatog commented on a diff in pull request #6506: NIFI-10243: allow ControlRate to throttle on combination of data rate or flowfile rate

Posted by GitBox <gi...@apache.org>.
thenatog commented on code in PR #6506:
URL: https://github.com/apache/nifi/pull/6506#discussion_r1014410492


##########
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestControlRate.java:
##########
@@ -258,10 +282,245 @@ public void testNonExistingGroupAttribute() throws InterruptedException {
         runner.assertQueueEmpty();
     }
 
+    @Test
+    public void testIncreaseDataRate() throws InterruptedException {
+        final TestRunner runner = TestRunners.newTestRunner(new ControlRate());
+        runner.setProperty(ControlRate.RATE_CONTROL_CRITERIA, ControlRate.DATA_RATE);
+        runner.setProperty(ControlRate.MAX_RATE, "11 B");
+        runner.setProperty(ControlRate.TIME_PERIOD, "1 sec");
+
+        runner.enqueue("test data 1");
+        runner.enqueue("test data 2");
+        runner.enqueue("test data 3");
+        runner.enqueue("test data 4");
+        runner.enqueue("test data 5");
+        runner.enqueue("test data 6");
+
+        runner.run(7, true);
+
+        runner.assertTransferCount(ControlRate.REL_SUCCESS, 1);
+        runner.assertTransferCount(ControlRate.REL_FAILURE, 0);
+        runner.assertQueueNotEmpty();
+
+        // Increase rate after stopping processor. Previous count should remain since we are still inside time period
+        runner.setProperty(ControlRate.MAX_RATE, "33 B");
+        runner.run(7, false);
+        runner.assertTransferCount(ControlRate.REL_SUCCESS, 3);
+        runner.assertTransferCount(ControlRate.REL_FAILURE, 0);
+        runner.assertQueueNotEmpty();
+
+        // after 1 second, we should be able to send the up to 3 more flowfiles
+        Thread.sleep(ONE_SEC_PLUS);

Review Comment:
   Is there a way to reduce the time to less than a second? I see that the TIME_PERIOD has a validator for 1 second, but potentially if we could override this and other spots that may expect a 1 second period, to make it shorter overall for testing. Build time with tests on my personal machine is already pretty long, and build time on Github actions is frequently timing out, so we're trying to reduce as much of this in tests as possible. @exceptionfactory may have some advice.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] thenatog commented on pull request #6506: NIFI-10243: allow ControlRate to throttle on combination of data rate or flowfile rate

Posted by GitBox <gi...@apache.org>.
thenatog commented on PR #6506:
URL: https://github.com/apache/nifi/pull/6506#issuecomment-1314739162

   +1 will merge, maybe in future we can find some test efficiencies if they became a problem.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] thenatog commented on a diff in pull request #6506: NIFI-10243: allow ControlRate to throttle on combination of data rate or flowfile rate

Posted by GitBox <gi...@apache.org>.
thenatog commented on code in PR #6506:
URL: https://github.com/apache/nifi/pull/6506#discussion_r1014337529


##########
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ControlRate.java:
##########
@@ -71,30 +70,57 @@ public class ControlRate extends AbstractProcessor {
     public static final String DATA_RATE = "data rate";
     public static final String FLOWFILE_RATE = "flowfile count";
     public static final String ATTRIBUTE_RATE = "attribute value";
+    public static final String DATA_OR_FLOWFILE_RATE = "data rate or flowfile count";
+
     public static final AllowableValue DATA_RATE_VALUE = new AllowableValue(DATA_RATE, DATA_RATE,
             "Rate is controlled by counting bytes transferred per time duration.");
     public static final AllowableValue FLOWFILE_RATE_VALUE = new AllowableValue(FLOWFILE_RATE, FLOWFILE_RATE,
-            "Rate is controlled by counting flowfiles transferred per time duration");
+            "Rate is controlled by counting FlowFiles transferred per time duration");
     public static final AllowableValue ATTRIBUTE_RATE_VALUE = new AllowableValue(ATTRIBUTE_RATE, ATTRIBUTE_RATE,
             "Rate is controlled by accumulating the value of a specified attribute that is transferred per time duration");
+    public static final AllowableValue DATA_OR_FLOWFILE_RATE_VALUE = new AllowableValue(DATA_OR_FLOWFILE_RATE, DATA_OR_FLOWFILE_RATE,
+            "Rate is controlled by counting bytes and FlowFiles transferred per time duration; if either threshold is met, throttling is enforced");
 
     // based on testing to balance commits and 10,000 FF swap limit
     public static final int MAX_FLOW_FILES_PER_BATCH = 1000;
 
     public static final PropertyDescriptor RATE_CONTROL_CRITERIA = new PropertyDescriptor.Builder()
             .name("Rate Control Criteria")
+            .displayName("Rate Control Criteria")
             .description("Indicates the criteria that is used to control the throughput rate. Changing this value resets the rate counters.")
             .required(true)
-            .allowableValues(DATA_RATE_VALUE, FLOWFILE_RATE_VALUE, ATTRIBUTE_RATE_VALUE)
+            .allowableValues(DATA_RATE_VALUE, FLOWFILE_RATE_VALUE, ATTRIBUTE_RATE_VALUE, DATA_OR_FLOWFILE_RATE_VALUE)
             .defaultValue(DATA_RATE)
             .build();
     public static final PropertyDescriptor MAX_RATE = new PropertyDescriptor.Builder()
             .name("Maximum Rate")
+            .displayName("Maximum Rate")
             .description("The maximum rate at which data should pass through this processor. The format of this property is expected to be a "
                     + "positive integer, or a Data Size (such as '1 MB') if Rate Control Criteria is set to 'data rate'.")
-            .required(true)
+            .required(false)
             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) // validated in customValidate b/c dependent on Rate Control Criteria
+            .dependsOn(RATE_CONTROL_CRITERIA, DATA_RATE_VALUE, FLOWFILE_RATE_VALUE, ATTRIBUTE_RATE_VALUE)
+            .build();
+    public static final PropertyDescriptor MAX_DATA_RATE = new PropertyDescriptor.Builder()
+            .name("Maximum Data Rate")
+            .displayName("Maximum Data Rate")
+            .description("The maximum rate at which data should pass through this processor. The format of this property is expected to be a "
+                    + "Data Size (such as '1 MB') representing bytes per Time Duration.")
+            .required(false)
+            .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+            .dependsOn(RATE_CONTROL_CRITERIA, DATA_OR_FLOWFILE_RATE)
             .build();
+
+    public static final PropertyDescriptor MAX_COUNT_RATE = new PropertyDescriptor.Builder()
+            .name("Maximum FlowFile Rate")
+            .displayName("Maximum FlowFile Rate")
+            .description("The maximum rate at which FlowFiles should pass through this processor. The format of this property is expected to be a "
+                    + "positive integer representing FlowFiles count per Time Duration")
+            .required(false)
+            .addValidator(StandardValidators.POSITIVE_LONG_VALIDATOR)
+            .dependsOn(RATE_CONTROL_CRITERIA, DATA_OR_FLOWFILE_RATE)
+            .build();
+
     public static final PropertyDescriptor RATE_CONTROL_ATTRIBUTE_NAME = new PropertyDescriptor.Builder()
             .name("Rate Controlled Attribute")
             .description("The name of an attribute whose values build toward the rate limit if Rate Control Criteria is set to 'attribute value'. "

Review Comment:
   Should the RATE_CONTROL_ATTRIBUTE_NAME descriptor also dependsOn 'ATTRIBUTE_RATE_VALUE' in the Rate Control Criteria or should it always be available to configure?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] markobean commented on a diff in pull request #6506: NIFI-10243: allow ControlRate to throttle on combination of data rate or flowfile rate

Posted by GitBox <gi...@apache.org>.
markobean commented on code in PR #6506:
URL: https://github.com/apache/nifi/pull/6506#discussion_r1014414545


##########
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ControlRate.java:
##########
@@ -71,30 +70,57 @@ public class ControlRate extends AbstractProcessor {
     public static final String DATA_RATE = "data rate";

Review Comment:
   I believe this is a breaking change that would affect existing flows. At least, when I updated the values to have a different case, e.g. "Data Rate", all unit tests failed. I don't see a big deal with this, and to avoid potential problems I think it should be left as-is.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] markobean commented on pull request #6506: NIFI-10243: allow ControlRate to throttle on combination of data rate or flowfile rate

Posted by GitBox <gi...@apache.org>.
markobean commented on PR #6506:
URL: https://github.com/apache/nifi/pull/6506#issuecomment-1309455563

   @exceptionfactory The time period cannot be reduced to 500 ms. The validator requires a minimum value of 1 sec. This is because the exact rate becomes less accurate the smaller the time period, especially once in sub-second range (and made worse on a busy system.) In order to change the unit tests to a smaller value, the validator would have to change. I do not think that is an advisable approach - to allow configurations which could mislead users in terms of accuracy for the sake of shortening unit tests. In order to test properly, the sleep time is a necessary evil.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] michael81877 commented on a diff in pull request #6506: NIFI-10243: allow ControlRate to throttle on combination of data rate or flowfile rate

Posted by GitBox <gi...@apache.org>.
michael81877 commented on code in PR #6506:
URL: https://github.com/apache/nifi/pull/6506#discussion_r991917936


##########
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ControlRate.java:
##########
@@ -211,16 +241,37 @@ public void onPropertyModified(final PropertyDescriptor descriptor, final String
                 || descriptor.equals(GROUPING_ATTRIBUTE_NAME)
                 || descriptor.equals(TIME_PERIOD)) {
             // if the criteria that is being used to determine limits/throttles is changed, we must clear our throttle map.
-            throttleMap.clear();
-        } else if (descriptor.equals(MAX_RATE)) {
+            dataThrottleMap.clear();
+            countThrottleMap.clear();
+        } else if (descriptor.equals(MAX_RATE) || descriptor.equals(MAX_DATA_RATE)) {
+            // MAX_RATE could affect eitehr throttle map; MAX_DATA_RATE only affects data throttle map

Review Comment:
   ```suggestion
               // MAX_RATE could affect either throttle map; MAX_DATA_RATE only affects data throttle map
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] markobean commented on a diff in pull request #6506: NIFI-10243: allow ControlRate to throttle on combination of data rate or flowfile rate

Posted by GitBox <gi...@apache.org>.
markobean commented on code in PR #6506:
URL: https://github.com/apache/nifi/pull/6506#discussion_r1018464360


##########
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ControlRate.java:
##########
@@ -268,48 +336,67 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
         final ComponentLog logger = getLogger();
         for (FlowFile flowFile : flowFiles) {
             // call this to capture potential error
-            final long accrualAmount = getFlowFileAccrual(flowFile);
-            if (accrualAmount < 0) {
-                logger.error("Routing {} to 'failure' due to missing or invalid attribute", new Object[]{flowFile});
+            if (!isAccrualPossible(flowFile)) {
+                logger.error("Routing {} to 'failure' due to missing or invalid attribute", flowFile);
                 session.transfer(flowFile, REL_FAILURE);
             } else {
-                logger.info("transferring {} to 'success'", new Object[]{flowFile});
+                logger.info("transferring {} to 'success'", flowFile);
                 session.transfer(flowFile, REL_SUCCESS);
             }
         }
     }
 
+    /*
+     * Determine if the accrual amount is valid for the type of throttle being applied. For example, if throttling based on
+     * flowfile attribute, the specified attribute must be present and must be a long integer.
+     */
+    private boolean isAccrualPossible(FlowFile flowFile) {
+        if (rateControlCriteria.equals(ATTRIBUTE_RATE)) {
+            final String attributeValue = flowFile.getAttribute(rateControlAttribute);
+            return attributeValue != null && POSITIVE_LONG_PATTERN.matcher(attributeValue).matches();
+        }
+        return true;
+    }
+
     /*
      * Determine the amount this FlowFile will incur against the maximum allowed rate.
-     * If the value returned is negative then the flowfile given is missing the required attribute
-     * or the attribute has an invalid value for accrual.
+     * This is applicable to data size accrual only
      */
-    private long getFlowFileAccrual(FlowFile flowFile) {
-        long rateValue;
-        switch (rateControlCriteria) {
-            case DATA_RATE:
-                rateValue = flowFile.getSize();
-                break;
-            case FLOWFILE_RATE:
-                rateValue = 1;
-                break;
-            case ATTRIBUTE_RATE:
-                final String attributeValue = flowFile.getAttribute(rateControlAttribute);
-                if (attributeValue == null) {
-                    return -1L;
-                }
+    private long getDataSizeAccrual(FlowFile flowFile) {
+        return flowFile.getSize();
+    }
 
-                if (!POSITIVE_LONG_PATTERN.matcher(attributeValue).matches()) {
-                    return -1L;
-                }
-                rateValue = Long.parseLong(attributeValue);
-                break;
-            default:
-                throw new AssertionError("<Rate Control Criteria> property set to illegal value of " + rateControlCriteria);
+    /*
+     * Determine the amount this FlowFile will incur against the maximum allowed rate.
+     * This is applicable to counting accruals, flowfiles or attributes
+     */
+    private long getCountAccrual(FlowFile flowFile) {
+        long rateValue = -1L;

Review Comment:
   Done.



##########
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ControlRate.java:
##########
@@ -408,34 +498,59 @@ public FlowFileFilterResult filter(FlowFile flowFile) {
                 groupName = DEFAULT_GROUP_ATTRIBUTE;
             }
 
-            Throttle throttle = throttleMap.get(groupName);
-            if (throttle == null) {
-                throttle = new Throttle(timePeriodSeconds, TimeUnit.SECONDS, getLogger());
+            Throttle dataThrottle = dataThrottleMap.get(groupName);
+            Throttle countThrottle = countThrottleMap.get(groupName);
 
-                final long newRate;
-                if (DataUnit.DATA_SIZE_PATTERN.matcher(maximumRateStr).matches()) {
-                    newRate = DataUnit.parseDataSize(maximumRateStr, DataUnit.B).longValue();
-                } else {
-                    newRate = Long.parseLong(maximumRateStr);
+            boolean dataThrottlingActive = false;
+            if (dataThrottleRequired()) {
+                if (dataThrottle == null) {
+                    dataThrottle = new Throttle(timePeriodSeconds, TimeUnit.SECONDS, getLogger());
+                    dataThrottle.setMaxRate(DataUnit.parseDataSize(maximumRateStr, DataUnit.B).longValue());
+                    dataThrottleMap.put(groupName, dataThrottle);
                 }
-                throttle.setMaxRate(newRate);
 
-                throttleMap.put(groupName, throttle);
+                dataThrottle.lock();
+                try {
+                    if (dataThrottle.tryAdd(getDataSizeAccrual(flowFile))) {
+                        flowFilesInBatch += 1;

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] michael81877 commented on a diff in pull request #6506: NIFI-10243: allow ControlRate to throttle on combination of data rate or flowfile rate

Posted by GitBox <gi...@apache.org>.
michael81877 commented on code in PR #6506:
URL: https://github.com/apache/nifi/pull/6506#discussion_r991924790


##########
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.ControlRate/additionalDetails.html:
##########
@@ -0,0 +1,64 @@
+<!DOCTYPE html>
+<html lang="en">
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+      http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<head>
+    <meta charset="utf-8" />
+    <title>ControlRate</title>
+
+    <link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css" />
+</head>
+<body>
+    <p>This processor throttles throughput of FlowFiles based on a configured rate. The rate can be specified as either a direct data rate (bytes per time period), or by
+        counting FlowFiles or a specific attribute value. In all cases, the time period for measurement is specified in the Time Duration property.
+    </p>
+    <p>The processor operates in one of four available modes. The mode is determined by the Rate Control Criteria property.
+    </p>
+    <p>
+        <table>
+            <tr>
+                <th>Mode</th>
+                <th>Description</th>
+            </tr>
+            <tr>
+                <td>Data Rate</td>
+                <td>The FlowFile content size is accumulated for all FlowFiles passing through this processor. FlowFiles are throttled to ensure a maximum overall data rate (bytes per time period)
+                    is not exceeded. The Maximum Rate property specifies the maximum bytes allowed per Time Duration.</td>
+            </tr>
+            <tr>
+                <td>FlowFile Count</td>
+                <td>FlowFiles are counted regardless of content size. No more than the specified number of FlowFiles pass through this processor in the given Time Duration. The Maximum Rate property
+                    specifies the maximum number of FlowFiles allowed per Time Duration.</td>
+            </tr>
+            <tr>
+                <td>Attribute Value</td>
+                <td>The value of an attribute is accumulated to determine overall rate. The Rate Controlled Attribute property specifies the attribute whose value will be accumulated. The value of
+                    the specified attribute is expected to be an integer. This mode is independent of overall FlowFile size and count.</td>
+            </tr>
+            <tr>
+                <td>Data Rate or FLowFile Count</td>

Review Comment:
   ```suggestion
                   <td>Data Rate or FlowFile Count</td>
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] thenatog commented on a diff in pull request #6506: NIFI-10243: allow ControlRate to throttle on combination of data rate or flowfile rate

Posted by GitBox <gi...@apache.org>.
thenatog commented on code in PR #6506:
URL: https://github.com/apache/nifi/pull/6506#discussion_r1014318046


##########
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestControlRate.java:
##########
@@ -258,10 +282,245 @@ public void testNonExistingGroupAttribute() throws InterruptedException {
         runner.assertQueueEmpty();
     }
 
+    @Test
+    public void testIncreaseDataRate() throws InterruptedException {
+        final TestRunner runner = TestRunners.newTestRunner(new ControlRate());
+        runner.setProperty(ControlRate.RATE_CONTROL_CRITERIA, ControlRate.DATA_RATE);
+        runner.setProperty(ControlRate.MAX_RATE, "11 B");
+        runner.setProperty(ControlRate.TIME_PERIOD, "1 sec");
+
+        runner.enqueue("test data 1");
+        runner.enqueue("test data 2");
+        runner.enqueue("test data 3");
+        runner.enqueue("test data 4");
+        runner.enqueue("test data 5");
+        runner.enqueue("test data 6");
+
+        runner.run(7, true);
+
+        runner.assertTransferCount(ControlRate.REL_SUCCESS, 1);
+        runner.assertTransferCount(ControlRate.REL_FAILURE, 0);
+        runner.assertQueueNotEmpty();
+
+        // Increase rate after stopping processor. Previous count should remain since we are still inside time period
+        runner.setProperty(ControlRate.MAX_RATE, "33 B");
+        runner.run(7, false);
+        runner.assertTransferCount(ControlRate.REL_SUCCESS, 3);
+        runner.assertTransferCount(ControlRate.REL_FAILURE, 0);
+        runner.assertQueueNotEmpty();
+
+        // after 1 second, we should be able to send the up to 3 more flowfiles
+        Thread.sleep(ONE_SEC_PLUS);

Review Comment:
   I'm wary of adding tests which used thread sleeps. They are frequently unreliable when run on Github Actions infrastructure. The obvious problem is that the it's hard to test a control rate feature without using a delay like this.. Anyway, thought it was worth mentioning, as we're trying to reduce the number of flakey tests we have.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] markobean commented on a diff in pull request #6506: NIFI-10243: allow ControlRate to throttle on combination of data rate or flowfile rate

Posted by GitBox <gi...@apache.org>.
markobean commented on code in PR #6506:
URL: https://github.com/apache/nifi/pull/6506#discussion_r1014422046


##########
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestControlRate.java:
##########
@@ -258,10 +282,245 @@ public void testNonExistingGroupAttribute() throws InterruptedException {
         runner.assertQueueEmpty();
     }
 
+    @Test
+    public void testIncreaseDataRate() throws InterruptedException {
+        final TestRunner runner = TestRunners.newTestRunner(new ControlRate());
+        runner.setProperty(ControlRate.RATE_CONTROL_CRITERIA, ControlRate.DATA_RATE);
+        runner.setProperty(ControlRate.MAX_RATE, "11 B");
+        runner.setProperty(ControlRate.TIME_PERIOD, "1 sec");
+
+        runner.enqueue("test data 1");
+        runner.enqueue("test data 2");
+        runner.enqueue("test data 3");
+        runner.enqueue("test data 4");
+        runner.enqueue("test data 5");
+        runner.enqueue("test data 6");
+
+        runner.run(7, true);
+
+        runner.assertTransferCount(ControlRate.REL_SUCCESS, 1);
+        runner.assertTransferCount(ControlRate.REL_FAILURE, 0);
+        runner.assertQueueNotEmpty();
+
+        // Increase rate after stopping processor. Previous count should remain since we are still inside time period
+        runner.setProperty(ControlRate.MAX_RATE, "33 B");
+        runner.run(7, false);
+        runner.assertTransferCount(ControlRate.REL_SUCCESS, 3);
+        runner.assertTransferCount(ControlRate.REL_FAILURE, 0);
+        runner.assertQueueNotEmpty();
+
+        // after 1 second, we should be able to send the up to 3 more flowfiles
+        Thread.sleep(ONE_SEC_PLUS);

Review Comment:
   I really like the idea of reducing build times. However, I'm not aware of how to override a validator - nor do I believe that is a good idea. Such an override seems like it may introduce false positive results inadvertently. 
   
   I would prefer to take up the issue of reducing build times in a separate JIRA ticket as it is not directly related to the feature proposed here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] thenatog closed pull request #6506: NIFI-10243: allow ControlRate to throttle on combination of data rate or flowfile rate

Posted by GitBox <gi...@apache.org>.
thenatog closed pull request #6506: NIFI-10243: allow ControlRate to throttle on combination of data rate or flowfile rate
URL: https://github.com/apache/nifi/pull/6506


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] markobean commented on pull request #6506: NIFI-10243: allow ControlRate to throttle on combination of data rate or flowfile rate

Posted by GitBox <gi...@apache.org>.
markobean commented on PR #6506:
URL: https://github.com/apache/nifi/pull/6506#issuecomment-1305579098

   I think refactoring the inner classes is scope creep. Also, those classes are scoped as private implying there is specific intent to have them as inner classes. I recommend leaving them as-is or opening a separate ticket for further evaluation and work.
   
   Thanks for the review @thenatog. I believe all your comments have been resolved. Is this ready to be merged?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] exceptionfactory commented on a diff in pull request #6506: NIFI-10243: allow ControlRate to throttle on combination of data rate or flowfile rate

Posted by GitBox <gi...@apache.org>.
exceptionfactory commented on code in PR #6506:
URL: https://github.com/apache/nifi/pull/6506#discussion_r1018429196


##########
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ControlRate.java:
##########
@@ -268,48 +336,67 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
         final ComponentLog logger = getLogger();
         for (FlowFile flowFile : flowFiles) {
             // call this to capture potential error
-            final long accrualAmount = getFlowFileAccrual(flowFile);
-            if (accrualAmount < 0) {
-                logger.error("Routing {} to 'failure' due to missing or invalid attribute", new Object[]{flowFile});
+            if (!isAccrualPossible(flowFile)) {
+                logger.error("Routing {} to 'failure' due to missing or invalid attribute", flowFile);
                 session.transfer(flowFile, REL_FAILURE);
             } else {
-                logger.info("transferring {} to 'success'", new Object[]{flowFile});
+                logger.info("transferring {} to 'success'", flowFile);
                 session.transfer(flowFile, REL_SUCCESS);
             }
         }
     }
 
+    /*
+     * Determine if the accrual amount is valid for the type of throttle being applied. For example, if throttling based on
+     * flowfile attribute, the specified attribute must be present and must be a long integer.
+     */
+    private boolean isAccrualPossible(FlowFile flowFile) {
+        if (rateControlCriteria.equals(ATTRIBUTE_RATE)) {
+            final String attributeValue = flowFile.getAttribute(rateControlAttribute);
+            return attributeValue != null && POSITIVE_LONG_PATTERN.matcher(attributeValue).matches();
+        }
+        return true;
+    }
+
     /*
      * Determine the amount this FlowFile will incur against the maximum allowed rate.
-     * If the value returned is negative then the flowfile given is missing the required attribute
-     * or the attribute has an invalid value for accrual.
+     * This is applicable to data size accrual only
      */
-    private long getFlowFileAccrual(FlowFile flowFile) {
-        long rateValue;
-        switch (rateControlCriteria) {
-            case DATA_RATE:
-                rateValue = flowFile.getSize();
-                break;
-            case FLOWFILE_RATE:
-                rateValue = 1;
-                break;
-            case ATTRIBUTE_RATE:
-                final String attributeValue = flowFile.getAttribute(rateControlAttribute);
-                if (attributeValue == null) {
-                    return -1L;
-                }
+    private long getDataSizeAccrual(FlowFile flowFile) {
+        return flowFile.getSize();
+    }
 
-                if (!POSITIVE_LONG_PATTERN.matcher(attributeValue).matches()) {
-                    return -1L;
-                }
-                rateValue = Long.parseLong(attributeValue);
-                break;
-            default:
-                throw new AssertionError("<Rate Control Criteria> property set to illegal value of " + rateControlCriteria);
+    /*
+     * Determine the amount this FlowFile will incur against the maximum allowed rate.
+     * This is applicable to counting accruals, flowfiles or attributes
+     */
+    private long getCountAccrual(FlowFile flowFile) {
+        long rateValue = -1L;

Review Comment:
   It would be helpful to define a `private static final` value for the default value of `-1` and reuse that in multiple places.



##########
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ControlRate.java:
##########
@@ -408,34 +498,59 @@ public FlowFileFilterResult filter(FlowFile flowFile) {
                 groupName = DEFAULT_GROUP_ATTRIBUTE;
             }
 
-            Throttle throttle = throttleMap.get(groupName);
-            if (throttle == null) {
-                throttle = new Throttle(timePeriodSeconds, TimeUnit.SECONDS, getLogger());
+            Throttle dataThrottle = dataThrottleMap.get(groupName);
+            Throttle countThrottle = countThrottleMap.get(groupName);
 
-                final long newRate;
-                if (DataUnit.DATA_SIZE_PATTERN.matcher(maximumRateStr).matches()) {
-                    newRate = DataUnit.parseDataSize(maximumRateStr, DataUnit.B).longValue();
-                } else {
-                    newRate = Long.parseLong(maximumRateStr);
+            boolean dataThrottlingActive = false;
+            if (dataThrottleRequired()) {
+                if (dataThrottle == null) {
+                    dataThrottle = new Throttle(timePeriodSeconds, TimeUnit.SECONDS, getLogger());
+                    dataThrottle.setMaxRate(DataUnit.parseDataSize(maximumRateStr, DataUnit.B).longValue());
+                    dataThrottleMap.put(groupName, dataThrottle);
                 }
-                throttle.setMaxRate(newRate);
 
-                throttleMap.put(groupName, throttle);
+                dataThrottle.lock();
+                try {
+                    if (dataThrottle.tryAdd(getDataSizeAccrual(flowFile))) {
+                        flowFilesInBatch += 1;
+                        if (flowFilesInBatch>= flowFilesPerBatch) {
+                            flowFilesInBatch = 0;
+                            return FlowFileFilterResult.ACCEPT_AND_TERMINATE;
+                        } else {
+                            // only accept flowfile if additional count throttle does not need to run
+                            if (!countThrottleRequired()) {
+                                return FlowFileFilterResult.ACCEPT_AND_CONTINUE;
+                            }
+                        }
+                    } else {
+                        dataThrottlingActive = true;
+                    }
+                } finally {
+                    dataThrottle.unlock();
+                }
             }
 
-            throttle.lock();
-            try {
-                if (throttle.tryAdd(accrual)) {
-                    flowFilesInBatch += 1;
-                    if (flowFilesInBatch>= flowFilesPerBatch) {
-                        flowFilesInBatch = 0;
-                        return FlowFileFilterResult.ACCEPT_AND_TERMINATE;
-                    } else {
-                        return FlowFileFilterResult.ACCEPT_AND_CONTINUE;
+            // continue processing count throttle only if required and if data throttle is not already limiting flowfiles
+            if (countThrottleRequired() && !dataThrottlingActive) {
+                if (countThrottle == null) {
+                    countThrottle = new Throttle(timePeriodSeconds, TimeUnit.SECONDS, getLogger());
+                    countThrottle.setMaxRate(Long.parseLong(maximumCountRateStr));
+                    countThrottleMap.put(groupName, countThrottle);
+                }
+                countThrottle.lock();
+                try {
+                    if (countThrottle.tryAdd(getCountAccrual(flowFile))) {
+                        flowFilesInBatch += 1;

Review Comment:
   ```suggestion
                           flowFilesInBatch++;
   ```



##########
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ControlRate.java:
##########
@@ -408,34 +498,59 @@ public FlowFileFilterResult filter(FlowFile flowFile) {
                 groupName = DEFAULT_GROUP_ATTRIBUTE;
             }
 
-            Throttle throttle = throttleMap.get(groupName);
-            if (throttle == null) {
-                throttle = new Throttle(timePeriodSeconds, TimeUnit.SECONDS, getLogger());
+            Throttle dataThrottle = dataThrottleMap.get(groupName);
+            Throttle countThrottle = countThrottleMap.get(groupName);
 
-                final long newRate;
-                if (DataUnit.DATA_SIZE_PATTERN.matcher(maximumRateStr).matches()) {
-                    newRate = DataUnit.parseDataSize(maximumRateStr, DataUnit.B).longValue();
-                } else {
-                    newRate = Long.parseLong(maximumRateStr);
+            boolean dataThrottlingActive = false;
+            if (dataThrottleRequired()) {
+                if (dataThrottle == null) {
+                    dataThrottle = new Throttle(timePeriodSeconds, TimeUnit.SECONDS, getLogger());
+                    dataThrottle.setMaxRate(DataUnit.parseDataSize(maximumRateStr, DataUnit.B).longValue());
+                    dataThrottleMap.put(groupName, dataThrottle);
                 }
-                throttle.setMaxRate(newRate);
 
-                throttleMap.put(groupName, throttle);
+                dataThrottle.lock();
+                try {
+                    if (dataThrottle.tryAdd(getDataSizeAccrual(flowFile))) {
+                        flowFilesInBatch += 1;

Review Comment:
   ```suggestion
                           flowFilesInBatch++;
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] markobean commented on a diff in pull request #6506: NIFI-10243: allow ControlRate to throttle on combination of data rate or flowfile rate

Posted by GitBox <gi...@apache.org>.
markobean commented on code in PR #6506:
URL: https://github.com/apache/nifi/pull/6506#discussion_r1014324326


##########
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestControlRate.java:
##########
@@ -258,10 +282,245 @@ public void testNonExistingGroupAttribute() throws InterruptedException {
         runner.assertQueueEmpty();
     }
 
+    @Test
+    public void testIncreaseDataRate() throws InterruptedException {
+        final TestRunner runner = TestRunners.newTestRunner(new ControlRate());
+        runner.setProperty(ControlRate.RATE_CONTROL_CRITERIA, ControlRate.DATA_RATE);
+        runner.setProperty(ControlRate.MAX_RATE, "11 B");
+        runner.setProperty(ControlRate.TIME_PERIOD, "1 sec");
+
+        runner.enqueue("test data 1");
+        runner.enqueue("test data 2");
+        runner.enqueue("test data 3");
+        runner.enqueue("test data 4");
+        runner.enqueue("test data 5");
+        runner.enqueue("test data 6");
+
+        runner.run(7, true);
+
+        runner.assertTransferCount(ControlRate.REL_SUCCESS, 1);
+        runner.assertTransferCount(ControlRate.REL_FAILURE, 0);
+        runner.assertQueueNotEmpty();
+
+        // Increase rate after stopping processor. Previous count should remain since we are still inside time period
+        runner.setProperty(ControlRate.MAX_RATE, "33 B");
+        runner.run(7, false);
+        runner.assertTransferCount(ControlRate.REL_SUCCESS, 3);
+        runner.assertTransferCount(ControlRate.REL_FAILURE, 0);
+        runner.assertQueueNotEmpty();
+
+        // after 1 second, we should be able to send the up to 3 more flowfiles
+        Thread.sleep(ONE_SEC_PLUS);

Review Comment:
   Agree. I don't care for thread sleeps either. I'm open to suggestions if you have any. Still, the existing unit tests use sleeps and so my addition is consistent. And, if you noticed, I reduced the total sleep time (prior to adding new unit tests) by reducing the amount of time spent sleeping. Personally, that's an issue I see frequently - sleeping much longer than necessary which has a cumulative affect on the overall build. In this case, a minimum time of 1 sec is required due to the minimum time allowed by the property. And, the sleep is only 1.01 sec.
   
   In other words, I feel it's the best that can be done given the situation and the component being tested.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] thenatog commented on pull request #6506: NIFI-10243: allow ControlRate to throttle on combination of data rate or flowfile rate

Posted by GitBox <gi...@apache.org>.
thenatog commented on PR #6506:
URL: https://github.com/apache/nifi/pull/6506#issuecomment-1303776884

   Will review


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] markobean commented on a diff in pull request #6506: NIFI-10243: allow ControlRate to throttle on combination of data rate or flowfile rate

Posted by GitBox <gi...@apache.org>.
markobean commented on code in PR #6506:
URL: https://github.com/apache/nifi/pull/6506#discussion_r1014324326


##########
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestControlRate.java:
##########
@@ -258,10 +282,245 @@ public void testNonExistingGroupAttribute() throws InterruptedException {
         runner.assertQueueEmpty();
     }
 
+    @Test
+    public void testIncreaseDataRate() throws InterruptedException {
+        final TestRunner runner = TestRunners.newTestRunner(new ControlRate());
+        runner.setProperty(ControlRate.RATE_CONTROL_CRITERIA, ControlRate.DATA_RATE);
+        runner.setProperty(ControlRate.MAX_RATE, "11 B");
+        runner.setProperty(ControlRate.TIME_PERIOD, "1 sec");
+
+        runner.enqueue("test data 1");
+        runner.enqueue("test data 2");
+        runner.enqueue("test data 3");
+        runner.enqueue("test data 4");
+        runner.enqueue("test data 5");
+        runner.enqueue("test data 6");
+
+        runner.run(7, true);
+
+        runner.assertTransferCount(ControlRate.REL_SUCCESS, 1);
+        runner.assertTransferCount(ControlRate.REL_FAILURE, 0);
+        runner.assertQueueNotEmpty();
+
+        // Increase rate after stopping processor. Previous count should remain since we are still inside time period
+        runner.setProperty(ControlRate.MAX_RATE, "33 B");
+        runner.run(7, false);
+        runner.assertTransferCount(ControlRate.REL_SUCCESS, 3);
+        runner.assertTransferCount(ControlRate.REL_FAILURE, 0);
+        runner.assertQueueNotEmpty();
+
+        // after 1 second, we should be able to send the up to 3 more flowfiles
+        Thread.sleep(ONE_SEC_PLUS);

Review Comment:
   Agree. I don't care for thread sleeps either. I'm open to suggestions if you have any. Still, the existing unit tests uses sleeps and so my addition is consistent. And, if you noticed, I reduced the total sleep time (prior to adding new unit tests) by reducing the amount of time spent sleeping. Personally, that's an issue I see frequently - sleeping much longer than necessary which has a cumulative affect on the overall build. In this case, a minimum time of 1 sec is required due to the minimum time allowed by the property. And, the sleep is only 1.01 sec.
   
   In other words, I feel it's the best that can be done given the situation and the component being tested.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] markobean commented on a diff in pull request #6506: NIFI-10243: allow ControlRate to throttle on combination of data rate or flowfile rate

Posted by GitBox <gi...@apache.org>.
markobean commented on code in PR #6506:
URL: https://github.com/apache/nifi/pull/6506#discussion_r1014409957


##########
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ControlRate.java:
##########
@@ -71,30 +70,57 @@ public class ControlRate extends AbstractProcessor {
     public static final String DATA_RATE = "data rate";
     public static final String FLOWFILE_RATE = "flowfile count";
     public static final String ATTRIBUTE_RATE = "attribute value";
+    public static final String DATA_OR_FLOWFILE_RATE = "data rate or flowfile count";
+
     public static final AllowableValue DATA_RATE_VALUE = new AllowableValue(DATA_RATE, DATA_RATE,
             "Rate is controlled by counting bytes transferred per time duration.");
     public static final AllowableValue FLOWFILE_RATE_VALUE = new AllowableValue(FLOWFILE_RATE, FLOWFILE_RATE,
-            "Rate is controlled by counting flowfiles transferred per time duration");
+            "Rate is controlled by counting FlowFiles transferred per time duration");
     public static final AllowableValue ATTRIBUTE_RATE_VALUE = new AllowableValue(ATTRIBUTE_RATE, ATTRIBUTE_RATE,
             "Rate is controlled by accumulating the value of a specified attribute that is transferred per time duration");
+    public static final AllowableValue DATA_OR_FLOWFILE_RATE_VALUE = new AllowableValue(DATA_OR_FLOWFILE_RATE, DATA_OR_FLOWFILE_RATE,
+            "Rate is controlled by counting bytes and FlowFiles transferred per time duration; if either threshold is met, throttling is enforced");
 
     // based on testing to balance commits and 10,000 FF swap limit
     public static final int MAX_FLOW_FILES_PER_BATCH = 1000;
 
     public static final PropertyDescriptor RATE_CONTROL_CRITERIA = new PropertyDescriptor.Builder()
             .name("Rate Control Criteria")
+            .displayName("Rate Control Criteria")
             .description("Indicates the criteria that is used to control the throughput rate. Changing this value resets the rate counters.")
             .required(true)
-            .allowableValues(DATA_RATE_VALUE, FLOWFILE_RATE_VALUE, ATTRIBUTE_RATE_VALUE)
+            .allowableValues(DATA_RATE_VALUE, FLOWFILE_RATE_VALUE, ATTRIBUTE_RATE_VALUE, DATA_OR_FLOWFILE_RATE_VALUE)
             .defaultValue(DATA_RATE)
             .build();
     public static final PropertyDescriptor MAX_RATE = new PropertyDescriptor.Builder()
             .name("Maximum Rate")
+            .displayName("Maximum Rate")
             .description("The maximum rate at which data should pass through this processor. The format of this property is expected to be a "
                     + "positive integer, or a Data Size (such as '1 MB') if Rate Control Criteria is set to 'data rate'.")
-            .required(true)
+            .required(false)
             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) // validated in customValidate b/c dependent on Rate Control Criteria
+            .dependsOn(RATE_CONTROL_CRITERIA, DATA_RATE_VALUE, FLOWFILE_RATE_VALUE, ATTRIBUTE_RATE_VALUE)
+            .build();
+    public static final PropertyDescriptor MAX_DATA_RATE = new PropertyDescriptor.Builder()
+            .name("Maximum Data Rate")
+            .displayName("Maximum Data Rate")
+            .description("The maximum rate at which data should pass through this processor. The format of this property is expected to be a "
+                    + "Data Size (such as '1 MB') representing bytes per Time Duration.")
+            .required(false)
+            .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+            .dependsOn(RATE_CONTROL_CRITERIA, DATA_OR_FLOWFILE_RATE)
             .build();
+
+    public static final PropertyDescriptor MAX_COUNT_RATE = new PropertyDescriptor.Builder()
+            .name("Maximum FlowFile Rate")
+            .displayName("Maximum FlowFile Rate")
+            .description("The maximum rate at which FlowFiles should pass through this processor. The format of this property is expected to be a "
+                    + "positive integer representing FlowFiles count per Time Duration")
+            .required(false)
+            .addValidator(StandardValidators.POSITIVE_LONG_VALIDATOR)
+            .dependsOn(RATE_CONTROL_CRITERIA, DATA_OR_FLOWFILE_RATE)
+            .build();
+
     public static final PropertyDescriptor RATE_CONTROL_ATTRIBUTE_NAME = new PropertyDescriptor.Builder()
             .name("Rate Controlled Attribute")
             .description("The name of an attribute whose values build toward the rate limit if Rate Control Criteria is set to 'attribute value'. "

Review Comment:
   Good call. The `dependsOn` wasn't an option when this property was first created. It makes sense to add it now (even though not explicitly part of this ticket.)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] markobean commented on a diff in pull request #6506: NIFI-10243: allow ControlRate to throttle on combination of data rate or flowfile rate

Posted by GitBox <gi...@apache.org>.
markobean commented on code in PR #6506:
URL: https://github.com/apache/nifi/pull/6506#discussion_r1014414757


##########
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ControlRate.java:
##########
@@ -148,6 +176,8 @@ protected void init(final ProcessorInitializationContext context) {
         final List<PropertyDescriptor> properties = new ArrayList<>();
         properties.add(RATE_CONTROL_CRITERIA);
         properties.add(MAX_RATE);
+        properties.add(MAX_DATA_RATE);
+        properties.add(MAX_COUNT_RATE);
         properties.add(RATE_CONTROL_ATTRIBUTE_NAME);
         properties.add(TIME_PERIOD);

Review Comment:
   Moved to 2nd from top.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] thenatog commented on pull request #6506: NIFI-10243: allow ControlRate to throttle on combination of data rate or flowfile rate

Posted by GitBox <gi...@apache.org>.
thenatog commented on PR #6506:
URL: https://github.com/apache/nifi/pull/6506#issuecomment-1304064736

   I briefly reviewed the code changes and everything seems to make sense. I think ideally the inner classes which already existed could be moved to external files to clear out the main class somewhat - potentially something for another Jira task. 
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] thenatog commented on a diff in pull request #6506: NIFI-10243: allow ControlRate to throttle on combination of data rate or flowfile rate

Posted by GitBox <gi...@apache.org>.
thenatog commented on code in PR #6506:
URL: https://github.com/apache/nifi/pull/6506#discussion_r1014337569


##########
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ControlRate.java:
##########
@@ -71,30 +70,57 @@ public class ControlRate extends AbstractProcessor {
     public static final String DATA_RATE = "data rate";

Review Comment:
   Small issue but I would like to see these capitalized (Data Rate) rather than all lower case as is currently the case. Not sure if this will impact existing flows - if that's the case, leave it as is.



##########
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ControlRate.java:
##########
@@ -71,30 +70,57 @@ public class ControlRate extends AbstractProcessor {
     public static final String DATA_RATE = "data rate";
     public static final String FLOWFILE_RATE = "flowfile count";
     public static final String ATTRIBUTE_RATE = "attribute value";
+    public static final String DATA_OR_FLOWFILE_RATE = "data rate or flowfile count";
+
     public static final AllowableValue DATA_RATE_VALUE = new AllowableValue(DATA_RATE, DATA_RATE,
             "Rate is controlled by counting bytes transferred per time duration.");
     public static final AllowableValue FLOWFILE_RATE_VALUE = new AllowableValue(FLOWFILE_RATE, FLOWFILE_RATE,
-            "Rate is controlled by counting flowfiles transferred per time duration");
+            "Rate is controlled by counting FlowFiles transferred per time duration");
     public static final AllowableValue ATTRIBUTE_RATE_VALUE = new AllowableValue(ATTRIBUTE_RATE, ATTRIBUTE_RATE,
             "Rate is controlled by accumulating the value of a specified attribute that is transferred per time duration");
+    public static final AllowableValue DATA_OR_FLOWFILE_RATE_VALUE = new AllowableValue(DATA_OR_FLOWFILE_RATE, DATA_OR_FLOWFILE_RATE,
+            "Rate is controlled by counting bytes and FlowFiles transferred per time duration; if either threshold is met, throttling is enforced");
 
     // based on testing to balance commits and 10,000 FF swap limit
     public static final int MAX_FLOW_FILES_PER_BATCH = 1000;
 
     public static final PropertyDescriptor RATE_CONTROL_CRITERIA = new PropertyDescriptor.Builder()
             .name("Rate Control Criteria")
+            .displayName("Rate Control Criteria")
             .description("Indicates the criteria that is used to control the throughput rate. Changing this value resets the rate counters.")
             .required(true)
-            .allowableValues(DATA_RATE_VALUE, FLOWFILE_RATE_VALUE, ATTRIBUTE_RATE_VALUE)
+            .allowableValues(DATA_RATE_VALUE, FLOWFILE_RATE_VALUE, ATTRIBUTE_RATE_VALUE, DATA_OR_FLOWFILE_RATE_VALUE)
             .defaultValue(DATA_RATE)
             .build();
     public static final PropertyDescriptor MAX_RATE = new PropertyDescriptor.Builder()
             .name("Maximum Rate")
+            .displayName("Maximum Rate")
             .description("The maximum rate at which data should pass through this processor. The format of this property is expected to be a "
                     + "positive integer, or a Data Size (such as '1 MB') if Rate Control Criteria is set to 'data rate'.")
-            .required(true)
+            .required(false)
             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) // validated in customValidate b/c dependent on Rate Control Criteria
+            .dependsOn(RATE_CONTROL_CRITERIA, DATA_RATE_VALUE, FLOWFILE_RATE_VALUE, ATTRIBUTE_RATE_VALUE)
+            .build();
+    public static final PropertyDescriptor MAX_DATA_RATE = new PropertyDescriptor.Builder()
+            .name("Maximum Data Rate")
+            .displayName("Maximum Data Rate")
+            .description("The maximum rate at which data should pass through this processor. The format of this property is expected to be a "
+                    + "Data Size (such as '1 MB') representing bytes per Time Duration.")
+            .required(false)
+            .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+            .dependsOn(RATE_CONTROL_CRITERIA, DATA_OR_FLOWFILE_RATE)
             .build();
+
+    public static final PropertyDescriptor MAX_COUNT_RATE = new PropertyDescriptor.Builder()
+            .name("Maximum FlowFile Rate")
+            .displayName("Maximum FlowFile Rate")
+            .description("The maximum rate at which FlowFiles should pass through this processor. The format of this property is expected to be a "
+                    + "positive integer representing FlowFiles count per Time Duration")
+            .required(false)
+            .addValidator(StandardValidators.POSITIVE_LONG_VALIDATOR)
+            .dependsOn(RATE_CONTROL_CRITERIA, DATA_OR_FLOWFILE_RATE)
+            .build();
+
     public static final PropertyDescriptor RATE_CONTROL_ATTRIBUTE_NAME = new PropertyDescriptor.Builder()
             .name("Rate Controlled Attribute")
             .description("The name of an attribute whose values build toward the rate limit if Rate Control Criteria is set to 'attribute value'. "

Review Comment:
   Should the RATE_CONTROL_ATTRIBUTE_NAME descriptor also dependsOn 'ATTRIBUTE_RATE_VALUE' or should it always be available to configure?



##########
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ControlRate.java:
##########
@@ -148,6 +176,8 @@ protected void init(final ProcessorInitializationContext context) {
         final List<PropertyDescriptor> properties = new ArrayList<>();
         properties.add(RATE_CONTROL_CRITERIA);
         properties.add(MAX_RATE);
+        properties.add(MAX_DATA_RATE);
+        properties.add(MAX_COUNT_RATE);
         properties.add(RATE_CONTROL_ATTRIBUTE_NAME);
         properties.add(TIME_PERIOD);

Review Comment:
   The 'Time Period' descriptor should probably be moved to second or top position in the order, as it's a mandatory field and also the most relevant for the processor



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] markobean commented on a diff in pull request #6506: NIFI-10243: allow ControlRate to throttle on combination of data rate or flowfile rate

Posted by GitBox <gi...@apache.org>.
markobean commented on code in PR #6506:
URL: https://github.com/apache/nifi/pull/6506#discussion_r1018464607


##########
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ControlRate.java:
##########
@@ -408,34 +498,59 @@ public FlowFileFilterResult filter(FlowFile flowFile) {
                 groupName = DEFAULT_GROUP_ATTRIBUTE;
             }
 
-            Throttle throttle = throttleMap.get(groupName);
-            if (throttle == null) {
-                throttle = new Throttle(timePeriodSeconds, TimeUnit.SECONDS, getLogger());
+            Throttle dataThrottle = dataThrottleMap.get(groupName);
+            Throttle countThrottle = countThrottleMap.get(groupName);
 
-                final long newRate;
-                if (DataUnit.DATA_SIZE_PATTERN.matcher(maximumRateStr).matches()) {
-                    newRate = DataUnit.parseDataSize(maximumRateStr, DataUnit.B).longValue();
-                } else {
-                    newRate = Long.parseLong(maximumRateStr);
+            boolean dataThrottlingActive = false;
+            if (dataThrottleRequired()) {
+                if (dataThrottle == null) {
+                    dataThrottle = new Throttle(timePeriodSeconds, TimeUnit.SECONDS, getLogger());
+                    dataThrottle.setMaxRate(DataUnit.parseDataSize(maximumRateStr, DataUnit.B).longValue());
+                    dataThrottleMap.put(groupName, dataThrottle);
                 }
-                throttle.setMaxRate(newRate);
 
-                throttleMap.put(groupName, throttle);
+                dataThrottle.lock();
+                try {
+                    if (dataThrottle.tryAdd(getDataSizeAccrual(flowFile))) {
+                        flowFilesInBatch += 1;
+                        if (flowFilesInBatch>= flowFilesPerBatch) {
+                            flowFilesInBatch = 0;
+                            return FlowFileFilterResult.ACCEPT_AND_TERMINATE;
+                        } else {
+                            // only accept flowfile if additional count throttle does not need to run
+                            if (!countThrottleRequired()) {
+                                return FlowFileFilterResult.ACCEPT_AND_CONTINUE;
+                            }
+                        }
+                    } else {
+                        dataThrottlingActive = true;
+                    }
+                } finally {
+                    dataThrottle.unlock();
+                }
             }
 
-            throttle.lock();
-            try {
-                if (throttle.tryAdd(accrual)) {
-                    flowFilesInBatch += 1;
-                    if (flowFilesInBatch>= flowFilesPerBatch) {
-                        flowFilesInBatch = 0;
-                        return FlowFileFilterResult.ACCEPT_AND_TERMINATE;
-                    } else {
-                        return FlowFileFilterResult.ACCEPT_AND_CONTINUE;
+            // continue processing count throttle only if required and if data throttle is not already limiting flowfiles
+            if (countThrottleRequired() && !dataThrottlingActive) {
+                if (countThrottle == null) {
+                    countThrottle = new Throttle(timePeriodSeconds, TimeUnit.SECONDS, getLogger());
+                    countThrottle.setMaxRate(Long.parseLong(maximumCountRateStr));
+                    countThrottleMap.put(groupName, countThrottle);
+                }
+                countThrottle.lock();
+                try {
+                    if (countThrottle.tryAdd(getCountAccrual(flowFile))) {
+                        flowFilesInBatch += 1;

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org