You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by mo...@apache.org on 2017/01/12 20:30:09 UTC
nifi git commit: NIFI-2861 ControlRate should accept more than one
flow file per execution
Repository: nifi
Updated Branches:
refs/heads/0.x 954201a4d -> a3d95dc15
NIFI-2861 ControlRate should accept more than one flow file per execution
Signed-off-by: Mike Moser <mo...@apache.org>
This closes #1128
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/a3d95dc1
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/a3d95dc1
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/a3d95dc1
Branch: refs/heads/0.x
Commit: a3d95dc1582f2edfd7997c5d8a23105e88729d11
Parents: 954201a
Author: Joe Skora <js...@gmail.com>
Authored: Thu Oct 13 02:18:23 2016 -0400
Committer: Mike Moser <mo...@apache.org>
Committed: Thu Jan 12 15:28:45 2017 -0500
----------------------------------------------------------------------
.../nifi/processors/standard/ControlRate.java | 20 ++++++++++--
.../processors/standard/TestControlRate.java | 32 ++++++++++++++++++++
2 files changed, 50 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/a3d95dc1/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ControlRate.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ControlRate.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ControlRate.java
index 5612d4f..18d6ff9 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ControlRate.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ControlRate.java
@@ -77,6 +77,9 @@ public class ControlRate extends AbstractProcessor {
public static final AllowableValue ATTRIBUTE_RATE_VALUE = new AllowableValue(ATTRIBUTE_RATE, ATTRIBUTE_RATE,
"Rate is controlled by accumulating the value of a specified attribute that is transferred per time duration");
+ // based on testing to balance commits and 10,000 FF swap limit
+ public static final int MAX_FLOW_FILES_PER_BATCH = 1000;
+
public static final PropertyDescriptor RATE_CONTROL_CRITERIA = new PropertyDescriptor.Builder()
.name("Rate Control Criteria")
.description("Indicates the criteria that is used to control the throughput rate. Changing this value resets the rate counters.")
@@ -233,7 +236,7 @@ public class ControlRate extends AbstractProcessor {
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
- List<FlowFile> flowFiles = session.get(new ThrottleFilter());
+ List<FlowFile> flowFiles = session.get(new ThrottleFilter(MAX_FLOW_FILES_PER_BATCH));
if (flowFiles.isEmpty()) {
context.yield();
return;
@@ -381,6 +384,13 @@ public class ControlRate extends AbstractProcessor {
private class ThrottleFilter implements FlowFileFilter {
+ private final int flowFilesPerBatch;
+ private int flowFilesInBatch = 0;
+
+ ThrottleFilter(final int maxFFPerBatch) {
+ flowFilesPerBatch = maxFFPerBatch;
+ }
+
@Override
public FlowFileFilterResult filter(FlowFile flowFile) {
long accrual = getFlowFileAccrual(flowFile);
@@ -409,7 +419,13 @@ public class ControlRate extends AbstractProcessor {
throttle.lock();
try {
if (throttle.tryAdd(accrual)) {
- return FlowFileFilterResult.ACCEPT_AND_TERMINATE;
+ flowFilesInBatch += 1;
+ if (flowFilesInBatch>= flowFilesPerBatch) {
+ flowFilesInBatch = 0;
+ return FlowFileFilterResult.ACCEPT_AND_TERMINATE;
+ } else {
+ return FlowFileFilterResult.ACCEPT_AND_CONTINUE;
+ }
}
} finally {
throttle.unlock();
http://git-wip-us.apache.org/repos/asf/nifi/blob/a3d95dc1/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestControlRate.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestControlRate.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestControlRate.java
index 2e6ce45..050f818 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestControlRate.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestControlRate.java
@@ -24,6 +24,9 @@ import org.apache.nifi.util.TestRunners;
import org.junit.Test;
+import static org.apache.nifi.processors.standard.ControlRate.MAX_FLOW_FILES_PER_BATCH;
+import static org.junit.Assert.assertEquals;
+
public class TestControlRate {
@Test
@@ -175,6 +178,35 @@ public class TestControlRate {
runner.assertQueueEmpty();
}
+ @Test
+ public void testBatchLimit() throws InterruptedException {
+ final TestRunner runner = TestRunners.newTestRunner(new ControlRate());
+ runner.setProperty(ControlRate.RATE_CONTROL_CRITERIA, ControlRate.FLOWFILE_RATE);
+ runner.setProperty(ControlRate.MAX_RATE, "5555");
+ runner.setProperty(ControlRate.TIME_PERIOD, "1 sec");
+
+ final int TEST_FILE_COUNT = 1500;
+
+ for (int i = 0; i < TEST_FILE_COUNT; i++) {
+ runner.enqueue("test data " + i);
+ }
+
+ runner.run(1, false);
+
+ // after 1 run should have MAX_FLOW_FILES_PER_BATCH files transferred and remainder of TEST_FILE_COUNT in queue
+ runner.assertAllFlowFilesTransferred(ControlRate.REL_SUCCESS, MAX_FLOW_FILES_PER_BATCH);
+ runner.assertTransferCount(ControlRate.REL_FAILURE, 0);
+ runner.assertQueueNotEmpty();
+ assertEquals(TEST_FILE_COUNT - MAX_FLOW_FILES_PER_BATCH, runner.getQueueSize().getObjectCount());
+
+ runner.run(1, false);
+
+ // after 2 runs should have TEST_FILE_COUNT files transferred and 0 in queue
+ runner.assertAllFlowFilesTransferred(ControlRate.REL_SUCCESS, TEST_FILE_COUNT);
+ runner.assertTransferCount(ControlRate.REL_FAILURE, 0);
+ runner.assertQueueEmpty();
+ }
+
private void createFlowFile(final TestRunner runner, final int value) {
final Map<String, String> attributeMap = new HashMap<>();
attributeMap.put("count", String.valueOf(value));