You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by mo...@apache.org on 2017/01/12 20:30:09 UTC

nifi git commit: NIFI-2861 ControlRate should accept more than one flow file per execution

Repository: nifi
Updated Branches:
  refs/heads/0.x 954201a4d -> a3d95dc15


NIFI-2861 ControlRate should accept more than one flow file per execution

Signed-off-by: Mike Moser <mo...@apache.org>

This closes #1128


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/a3d95dc1
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/a3d95dc1
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/a3d95dc1

Branch: refs/heads/0.x
Commit: a3d95dc1582f2edfd7997c5d8a23105e88729d11
Parents: 954201a
Author: Joe Skora <js...@gmail.com>
Authored: Thu Oct 13 02:18:23 2016 -0400
Committer: Mike Moser <mo...@apache.org>
Committed: Thu Jan 12 15:28:45 2017 -0500

----------------------------------------------------------------------
 .../nifi/processors/standard/ControlRate.java   | 20 ++++++++++--
 .../processors/standard/TestControlRate.java    | 32 ++++++++++++++++++++
 2 files changed, 50 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/a3d95dc1/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ControlRate.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ControlRate.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ControlRate.java
index 5612d4f..18d6ff9 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ControlRate.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ControlRate.java
@@ -77,6 +77,9 @@ public class ControlRate extends AbstractProcessor {
     public static final AllowableValue ATTRIBUTE_RATE_VALUE = new AllowableValue(ATTRIBUTE_RATE, ATTRIBUTE_RATE,
             "Rate is controlled by accumulating the value of a specified attribute that is transferred per time duration");
 
+    // based on testing to balance commits and 10,000 FF swap limit
+    public static final int MAX_FLOW_FILES_PER_BATCH = 1000;
+
     public static final PropertyDescriptor RATE_CONTROL_CRITERIA = new PropertyDescriptor.Builder()
             .name("Rate Control Criteria")
             .description("Indicates the criteria that is used to control the throughput rate. Changing this value resets the rate counters.")
@@ -233,7 +236,7 @@ public class ControlRate extends AbstractProcessor {
 
     @Override
     public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
-        List<FlowFile> flowFiles = session.get(new ThrottleFilter());
+        List<FlowFile> flowFiles = session.get(new ThrottleFilter(MAX_FLOW_FILES_PER_BATCH));
         if (flowFiles.isEmpty()) {
             context.yield();
             return;
@@ -381,6 +384,13 @@ public class ControlRate extends AbstractProcessor {
 
     private class ThrottleFilter implements FlowFileFilter {
 
+        private final int flowFilesPerBatch;
+        private int flowFilesInBatch = 0;
+
+        ThrottleFilter(final int maxFFPerBatch) {
+            flowFilesPerBatch = maxFFPerBatch;
+        }
+
         @Override
         public FlowFileFilterResult filter(FlowFile flowFile) {
             long accrual = getFlowFileAccrual(flowFile);
@@ -409,7 +419,13 @@ public class ControlRate extends AbstractProcessor {
             throttle.lock();
             try {
                 if (throttle.tryAdd(accrual)) {
-                    return FlowFileFilterResult.ACCEPT_AND_TERMINATE;
+                    flowFilesInBatch += 1;
+                    if (flowFilesInBatch>= flowFilesPerBatch) {
+                        flowFilesInBatch = 0;
+                        return FlowFileFilterResult.ACCEPT_AND_TERMINATE;
+                    } else {
+                        return FlowFileFilterResult.ACCEPT_AND_CONTINUE;
+                    }
                 }
             } finally {
                 throttle.unlock();

http://git-wip-us.apache.org/repos/asf/nifi/blob/a3d95dc1/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestControlRate.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestControlRate.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestControlRate.java
index 2e6ce45..050f818 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestControlRate.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestControlRate.java
@@ -24,6 +24,9 @@ import org.apache.nifi.util.TestRunners;
 
 import org.junit.Test;
 
+import static org.apache.nifi.processors.standard.ControlRate.MAX_FLOW_FILES_PER_BATCH;
+import static org.junit.Assert.assertEquals;
+
 public class TestControlRate {
 
     @Test
@@ -175,6 +178,35 @@ public class TestControlRate {
         runner.assertQueueEmpty();
     }
 
+    @Test
+    public void testBatchLimit() throws InterruptedException {
+        final TestRunner runner = TestRunners.newTestRunner(new ControlRate());
+        runner.setProperty(ControlRate.RATE_CONTROL_CRITERIA, ControlRate.FLOWFILE_RATE);
+        runner.setProperty(ControlRate.MAX_RATE, "5555");
+        runner.setProperty(ControlRate.TIME_PERIOD, "1 sec");
+
+        final int TEST_FILE_COUNT = 1500;
+
+        for (int i = 0; i < TEST_FILE_COUNT; i++) {
+            runner.enqueue("test data " + i);
+        }
+
+        runner.run(1, false);
+
+        // after 1 run should have MAX_FLOW_FILES_PER_BATCH files transferred and remainder of TEST_FILE_COUNT in queue
+        runner.assertAllFlowFilesTransferred(ControlRate.REL_SUCCESS, MAX_FLOW_FILES_PER_BATCH);
+        runner.assertTransferCount(ControlRate.REL_FAILURE, 0);
+        runner.assertQueueNotEmpty();
+        assertEquals(TEST_FILE_COUNT - MAX_FLOW_FILES_PER_BATCH, runner.getQueueSize().getObjectCount());
+
+        runner.run(1, false);
+
+        // after 2 runs should have TEST_FILE_COUNT files transferred and 0 in queue
+        runner.assertAllFlowFilesTransferred(ControlRate.REL_SUCCESS, TEST_FILE_COUNT);
+        runner.assertTransferCount(ControlRate.REL_FAILURE, 0);
+        runner.assertQueueEmpty();
+    }
+
     private void createFlowFile(final TestRunner runner, final int value) {
         final Map<String, String> attributeMap = new HashMap<>();
         attributeMap.put("count", String.valueOf(value));