You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by mo...@apache.org on 2017/01/12 22:06:46 UTC

nifi git commit: NIFI-2861 ControlRate should accept more than one flow file per execution * Support multiple files per onTrigger call.

Repository: nifi
Updated Branches:
  refs/heads/master 2fbeabb95 -> 4d533a99b


NIFI-2861 ControlRate should accept more than one flow file per execution
 * Support multiple files per onTrigger call.

Signed-off-by: Mike Moser <mo...@apache.org>

This closes #1412.


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/4d533a99
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/4d533a99
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/4d533a99

Branch: refs/heads/master
Commit: 4d533a99b3790ec842a7230fd6cae0d59158c2b5
Parents: 2fbeabb
Author: Joe Skora <js...@apache.org>
Authored: Thu Jan 12 16:28:34 2017 +0000
Committer: Mike Moser <mo...@apache.org>
Committed: Thu Jan 12 21:43:12 2017 +0000

----------------------------------------------------------------------
 .../nifi/processors/standard/ControlRate.java   | 24 ++++++++++++---
 .../processors/standard/TestControlRate.java    | 32 ++++++++++++++++++++
 2 files changed, 52 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/4d533a99/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ControlRate.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ControlRate.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ControlRate.java
index bce67ac..006b8ed 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ControlRate.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ControlRate.java
@@ -77,6 +77,9 @@ public class ControlRate extends AbstractProcessor {
     public static final AllowableValue ATTRIBUTE_RATE_VALUE = new AllowableValue(ATTRIBUTE_RATE, ATTRIBUTE_RATE,
             "Rate is controlled by accumulating the value of a specified attribute that is transferred per time duration");
 
+    // based on testing to balance commits and 10,000 FF swap limit
+    public static final int MAX_FLOW_FILES_PER_BATCH = 1000;
+
     public static final PropertyDescriptor RATE_CONTROL_CRITERIA = new PropertyDescriptor.Builder()
             .name("Rate Control Criteria")
             .description("Indicates the criteria that is used to control the throughput rate. Changing this value resets the rate counters.")
@@ -233,7 +236,7 @@ public class ControlRate extends AbstractProcessor {
 
     @Override
     public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
-        List<FlowFile> flowFiles = session.get(new ThrottleFilter());
+        List<FlowFile> flowFiles = session.get(new ThrottleFilter(MAX_FLOW_FILES_PER_BATCH));
         if (flowFiles.isEmpty()) {
             context.yield();
             return;
@@ -292,11 +295,11 @@ public class ControlRate extends AbstractProcessor {
             case ATTRIBUTE_RATE:
                 final String attributeValue = flowFile.getAttribute(rateControlAttribute);
                 if (attributeValue == null) {
-                    return -1l;
+                    return -1L;
                 }
 
                 if (!POSITIVE_LONG_PATTERN.matcher(attributeValue).matches()) {
-                    return -1l;
+                    return -1L;
                 }
                 rateValue = Long.parseLong(attributeValue);
                 break;
@@ -381,6 +384,13 @@ public class ControlRate extends AbstractProcessor {
 
     private class ThrottleFilter implements FlowFileFilter {
 
+        private final int flowFilesPerBatch;
+        private int flowFilesInBatch = 0;
+
+        ThrottleFilter(final int maxFFPerBatch) {
+            flowFilesPerBatch = maxFFPerBatch;
+        }
+
         @Override
         public FlowFileFilterResult filter(FlowFile flowFile) {
             long accrual = getFlowFileAccrual(flowFile);
@@ -409,7 +419,13 @@ public class ControlRate extends AbstractProcessor {
             throttle.lock();
             try {
                 if (throttle.tryAdd(accrual)) {
-                    return FlowFileFilterResult.ACCEPT_AND_TERMINATE;
+                    flowFilesInBatch += 1;
+                    if (flowFilesInBatch>= flowFilesPerBatch) {
+                        flowFilesInBatch = 0;
+                        return FlowFileFilterResult.ACCEPT_AND_TERMINATE;
+                    } else {
+                        return FlowFileFilterResult.ACCEPT_AND_CONTINUE;
+                    }
                 }
             } finally {
                 throttle.unlock();

http://git-wip-us.apache.org/repos/asf/nifi/blob/4d533a99/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestControlRate.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestControlRate.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestControlRate.java
index 2e6ce45..050f818 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestControlRate.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestControlRate.java
@@ -24,6 +24,9 @@ import org.apache.nifi.util.TestRunners;
 
 import org.junit.Test;
 
+import static org.apache.nifi.processors.standard.ControlRate.MAX_FLOW_FILES_PER_BATCH;
+import static org.junit.Assert.assertEquals;
+
 public class TestControlRate {
 
     @Test
@@ -175,6 +178,35 @@ public class TestControlRate {
         runner.assertQueueEmpty();
     }
 
+    @Test
+    public void testBatchLimit() throws InterruptedException {
+        final TestRunner runner = TestRunners.newTestRunner(new ControlRate());
+        runner.setProperty(ControlRate.RATE_CONTROL_CRITERIA, ControlRate.FLOWFILE_RATE);
+        runner.setProperty(ControlRate.MAX_RATE, "5555");
+        runner.setProperty(ControlRate.TIME_PERIOD, "1 sec");
+
+        final int TEST_FILE_COUNT = 1500;
+
+        for (int i = 0; i < TEST_FILE_COUNT; i++) {
+            runner.enqueue("test data " + i);
+        }
+
+        runner.run(1, false);
+
+        // after 1 run should have MAX_FLOW_FILES_PER_BATCH files transferred and remainder of TEST_FILE_COUNT in queue
+        runner.assertAllFlowFilesTransferred(ControlRate.REL_SUCCESS, MAX_FLOW_FILES_PER_BATCH);
+        runner.assertTransferCount(ControlRate.REL_FAILURE, 0);
+        runner.assertQueueNotEmpty();
+        assertEquals(TEST_FILE_COUNT - MAX_FLOW_FILES_PER_BATCH, runner.getQueueSize().getObjectCount());
+
+        runner.run(1, false);
+
+        // after 2 runs should have TEST_FILE_COUNT files transferred and 0 in queue
+        runner.assertAllFlowFilesTransferred(ControlRate.REL_SUCCESS, TEST_FILE_COUNT);
+        runner.assertTransferCount(ControlRate.REL_FAILURE, 0);
+        runner.assertQueueEmpty();
+    }
+
     private void createFlowFile(final TestRunner runner, final int value) {
         final Map<String, String> attributeMap = new HashMap<>();
         attributeMap.put("count", String.valueOf(value));