You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by gr...@apache.org on 2022/11/22 18:14:42 UTC
[nifi] branch main updated: NIFI-10835 Improved performance of TestControlRate
This is an automated email from the ASF dual-hosted git repository.
greyp pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 7019e182b5 NIFI-10835 Improved performance of TestControlRate
7019e182b5 is described below
commit 7019e182b5b0e36cc46ad37456f435b2a84269d5
Author: exceptionfactory <ex...@apache.org>
AuthorDate: Wed Nov 16 16:03:53 2022 -0600
NIFI-10835 Improved performance of TestControlRate
- Added LongSupplier for TimedBuffer and ControlRate classes to support overriding System.currentTimeMillis()
This closes #6671
Signed-off-by: Paul Grey <gr...@apache.org>
---
.../apache/nifi/util/timebuffer/TimedBuffer.java | 21 ++-
.../nifi/processors/standard/ControlRate.java | 44 ++++---
.../nifi/processors/standard/TestControlRate.java | 144 ++++++++++-----------
3 files changed, 116 insertions(+), 93 deletions(-)
diff --git a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/timebuffer/TimedBuffer.java b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/timebuffer/TimedBuffer.java
index dd8e5232c4..af241a525f 100644
--- a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/timebuffer/TimedBuffer.java
+++ b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/timebuffer/TimedBuffer.java
@@ -18,6 +18,7 @@ package org.apache.nifi.util.timebuffer;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.LongSupplier;
public class TimedBuffer<T> {
@@ -25,20 +26,26 @@ public class TimedBuffer<T> {
private final EntitySum<T>[] bins;
private final EntityAccess<T> entityAccess;
private final TimeUnit binPrecision;
+ private final LongSupplier currentTimeSupplier;
- @SuppressWarnings("unchecked")
public TimedBuffer(final TimeUnit binPrecision, final int numBins, final EntityAccess<T> accessor) {
+ this(binPrecision, numBins, accessor, System::currentTimeMillis);
+ }
+
+ @SuppressWarnings("unchecked")
+ public TimedBuffer(final TimeUnit binPrecision, final int numBins, final EntityAccess<T> accessor, final LongSupplier currentTimeSupplier) {
this.binPrecision = binPrecision;
this.numBins = numBins + 1;
this.bins = new EntitySum[this.numBins];
for (int i = 0; i < this.numBins; i++) {
- this.bins[i] = new EntitySum<>(binPrecision, numBins, accessor);
+ this.bins[i] = new EntitySum<>(binPrecision, numBins, accessor, currentTimeSupplier);
}
this.entityAccess = accessor;
+ this.currentTimeSupplier = currentTimeSupplier;
}
public T add(final T entity) {
- final int binIdx = (int) (binPrecision.convert(System.currentTimeMillis(), TimeUnit.MILLISECONDS) % numBins);
+ final int binIdx = (int) (binPrecision.convert(currentTimeSupplier.getAsLong(), TimeUnit.MILLISECONDS) % numBins);
final EntitySum<T> sum = bins[binIdx];
return sum.addOrReset(entity);
@@ -66,11 +73,13 @@ public class TimedBuffer<T> {
private final AtomicReference<S> ref = new AtomicReference<>();
private final TimeUnit binPrecision;
private final int numConfiguredBins;
+ private final LongSupplier currentTimeSupplier;
- public EntitySum(final TimeUnit binPrecision, final int numConfiguredBins, final EntityAccess<S> aggregator) {
+ public EntitySum(final TimeUnit binPrecision, final int numConfiguredBins, final EntityAccess<S> aggregator, final LongSupplier currentTimeSupplier) {
this.binPrecision = binPrecision;
this.entityAccess = aggregator;
this.numConfiguredBins = numConfiguredBins;
+ this.currentTimeSupplier = currentTimeSupplier;
}
private S add(final S event) {
@@ -92,7 +101,7 @@ public class TimedBuffer<T> {
// entityAccess.getTimestamp(curValue) represents the time at which the current value
// was last updated. If the last value is less than current time - 1 binPrecision, then it
// means that we've rolled over and need to reset the value.
- final long maxExpectedTimePeriod = System.currentTimeMillis() - TimeUnit.MILLISECONDS.convert(numConfiguredBins, binPrecision);
+ final long maxExpectedTimePeriod = currentTimeSupplier.getAsLong() - TimeUnit.MILLISECONDS.convert(numConfiguredBins, binPrecision);
final S curValue = ref.get();
return (entityAccess.getTimestamp(curValue) < maxExpectedTimePeriod);
@@ -102,7 +111,7 @@ public class TimedBuffer<T> {
// entityAccess.getTimestamp(curValue) represents the time at which the current value
// was last updated. If the last value is less than current time - 1 binPrecision, then it
// means that we've rolled over and need to reset the value.
- final long maxExpectedTimePeriod = System.currentTimeMillis() - TimeUnit.MILLISECONDS.convert(1, binPrecision);
+ final long maxExpectedTimePeriod = currentTimeSupplier.getAsLong() - TimeUnit.MILLISECONDS.convert(1, binPrecision);
final S curValue = ref.get();
if (entityAccess.getTimestamp(curValue) < maxExpectedTimePeriod) {
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ControlRate.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ControlRate.java
index 34b9a8144b..ef62eee94f 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ControlRate.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ControlRate.java
@@ -56,6 +56,7 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.LongSupplier;
import java.util.regex.Pattern;
@SideEffectFree
@@ -165,7 +166,7 @@ public class ControlRate extends AbstractProcessor {
private final ConcurrentMap<String, Throttle> dataThrottleMap = new ConcurrentHashMap<>();
private final ConcurrentMap<String, Throttle> countThrottleMap = new ConcurrentHashMap<>();
- private final AtomicLong lastThrottleClearTime = new AtomicLong(System.currentTimeMillis());
+ private final AtomicLong lastThrottleClearTime = new AtomicLong(getCurrentTimeMillis());
private volatile String rateControlCriteria = null;
private volatile String rateControlAttribute = null;
private volatile String maximumRateStr = null;
@@ -299,7 +300,7 @@ public class ControlRate extends AbstractProcessor {
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
- List<FlowFile> flowFiles = session.get(new ThrottleFilter(MAX_FLOW_FILES_PER_BATCH));
+ List<FlowFile> flowFiles = session.get(new ThrottleFilter(MAX_FLOW_FILES_PER_BATCH, this::getCurrentTimeMillis));
if (flowFiles.isEmpty()) {
context.yield();
return;
@@ -307,9 +308,9 @@ public class ControlRate extends AbstractProcessor {
// Periodically clear any Throttle that has not been used in more than 2 throttling periods
final long lastClearTime = lastThrottleClearTime.get();
- final long throttleExpirationMillis = System.currentTimeMillis() - 2 * context.getProperty(TIME_PERIOD).asTimePeriod(TimeUnit.MILLISECONDS);
+ final long throttleExpirationMillis = getCurrentTimeMillis() - 2 * context.getProperty(TIME_PERIOD).asTimePeriod(TimeUnit.MILLISECONDS);
if (lastClearTime < throttleExpirationMillis) {
- if (lastThrottleClearTime.compareAndSet(lastClearTime, System.currentTimeMillis())) {
+ if (lastThrottleClearTime.compareAndSet(lastClearTime, getCurrentTimeMillis())) {
final Set<Map.Entry<String, Throttle>> throttleSet = new HashSet<>();
if (dataThrottleRequired()) {
throttleSet.addAll(dataThrottleMap.entrySet());
@@ -337,16 +338,25 @@ public class ControlRate extends AbstractProcessor {
final ComponentLog logger = getLogger();
for (FlowFile flowFile : flowFiles) {
// call this to capture potential error
- if (!isAccrualPossible(flowFile)) {
- logger.error("Routing {} to 'failure' due to missing or invalid attribute", flowFile);
- session.transfer(flowFile, REL_FAILURE);
- } else {
+ if (isAccrualPossible(flowFile)) {
logger.info("transferring {} to 'success'", flowFile);
session.transfer(flowFile, REL_SUCCESS);
+ } else {
+ logger.error("Routing {} to 'failure' due to missing or invalid attribute", flowFile);
+ session.transfer(flowFile, REL_FAILURE);
}
}
}
+ /**
+ * Get current time in milliseconds
+ *
+ * @return Current time in milliseconds from System
+ */
+ protected long getCurrentTimeMillis() {
+ return System.currentTimeMillis();
+ }
+
/*
* Determine if the accrual amount is valid for the type of throttle being applied. For example, if throttling based on
* flowfile attribute, the specified attribute must be present and must be a long integer.
@@ -404,15 +414,17 @@ public class ControlRate extends AbstractProcessor {
private final long timePeriodMillis;
private final TimedBuffer<TimestampedLong> timedBuffer;
private final ComponentLog logger;
+ private final LongSupplier currentTimeSupplier;
private volatile long penalizationPeriod = 0;
private volatile long penalizationExpired = 0;
private volatile long lastUpdateTime;
- public Throttle(final int timePeriod, final TimeUnit unit, final ComponentLog logger) {
+ private Throttle(final int timePeriod, final TimeUnit unit, final ComponentLog logger, final LongSupplier currentTimeSupplier) {
this.timePeriodMillis = TimeUnit.MILLISECONDS.convert(timePeriod, unit);
- this.timedBuffer = new TimedBuffer<>(unit, timePeriod, new LongEntityAccess());
+ this.timedBuffer = new TimedBuffer<>(unit, timePeriod, new LongEntityAccess(), currentTimeSupplier);
this.logger = logger;
+ this.currentTimeSupplier = currentTimeSupplier;
}
public void setMaxRate(final long maxRate) {
@@ -428,7 +440,7 @@ public class ControlRate extends AbstractProcessor {
if (value < 0) {
return false;
}
- final long now = System.currentTimeMillis();
+ final long now = currentTimeSupplier.getAsLong();
if (penalizationExpired > now) {
return false;
}
@@ -478,10 +490,12 @@ public class ControlRate extends AbstractProcessor {
private class ThrottleFilter implements FlowFileFilter {
private final int flowFilesPerBatch;
+ private final LongSupplier currentTimeSupplier;
private int flowFilesInBatch = 0;
- ThrottleFilter(final int maxFFPerBatch) {
- flowFilesPerBatch = maxFFPerBatch;
+ ThrottleFilter(final int maxFFPerBatch, final LongSupplier currentTimeSupplier) {
+ this.flowFilesPerBatch = maxFFPerBatch;
+ this.currentTimeSupplier = currentTimeSupplier;
}
@Override
@@ -505,7 +519,7 @@ public class ControlRate extends AbstractProcessor {
boolean dataThrottlingActive = false;
if (dataThrottleRequired()) {
if (dataThrottle == null) {
- dataThrottle = new Throttle(timePeriodSeconds, TimeUnit.SECONDS, getLogger());
+ dataThrottle = new Throttle(timePeriodSeconds, TimeUnit.SECONDS, getLogger(), currentTimeSupplier);
dataThrottle.setMaxRate(DataUnit.parseDataSize(maximumRateStr, DataUnit.B).longValue());
dataThrottleMap.put(groupName, dataThrottle);
}
@@ -534,7 +548,7 @@ public class ControlRate extends AbstractProcessor {
// continue processing count throttle only if required and if data throttle is not already limiting flowfiles
if (countThrottleRequired() && !dataThrottlingActive) {
if (countThrottle == null) {
- countThrottle = new Throttle(timePeriodSeconds, TimeUnit.SECONDS, getLogger());
+ countThrottle = new Throttle(timePeriodSeconds, TimeUnit.SECONDS, getLogger(), currentTimeSupplier);
countThrottle.setMaxRate(Long.parseLong(maximumCountRateStr));
countThrottleMap.put(groupName, countThrottle);
}
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestControlRate.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestControlRate.java
index 0b68022622..eb33a7c9df 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestControlRate.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestControlRate.java
@@ -27,18 +27,30 @@ import java.util.Map;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
+import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
public class TestControlRate {
- private static final long ONE_SEC_PLUS = 1010L;
+ private static final String ONE_SECOND_TIME_PERIOD = "1 s";
+
+ private static final long CURRENT_TIME_INCREMENT = 1100;
+
+ private ConfigurableControlRate controlRate;
+
+ private TestRunner runner;
+
+ @BeforeEach
+ public void setRunner() {
+ controlRate = new ConfigurableControlRate();
+ runner = TestRunners.newTestRunner(controlRate);
+ }
@Test
public void testLimitExceededThenOtherLimitNotExceeded() {
// If we have flowfiles queued that have different values for the "Rate Controlled Attribute"
// and we encounter a FlowFile whose rate should be throttled, we should continue pulling other flowfiles
// whose rate does not need to be throttled.
- final TestRunner runner = TestRunners.newTestRunner(new ControlRate());
runner.setProperty(ControlRate.RATE_CONTROL_CRITERIA, ControlRate.FLOWFILE_RATE);
runner.setProperty(ControlRate.MAX_RATE, "3");
runner.setProperty(ControlRate.TIME_PERIOD, "1 min");
@@ -64,11 +76,10 @@ public class TestControlRate {
}
@Test
- public void testFileCountRate() throws InterruptedException {
- final TestRunner runner = TestRunners.newTestRunner(new ControlRate());
+ public void testFileCountRate() {
runner.setProperty(ControlRate.RATE_CONTROL_CRITERIA, ControlRate.FLOWFILE_RATE);
runner.setProperty(ControlRate.MAX_RATE, "3");
- runner.setProperty(ControlRate.TIME_PERIOD, "1 sec");
+ runner.setProperty(ControlRate.TIME_PERIOD, ONE_SECOND_TIME_PERIOD);
runner.enqueue("test data 1");
runner.enqueue("test data 2");
@@ -86,18 +97,18 @@ public class TestControlRate {
runner.assertQueueNotEmpty();
// we have sent 3 files and after 1 second, we should be able to send the 4th
- Thread.sleep(ONE_SEC_PLUS);
- runner.run();
+ incrementCurrentTime();
+
+ runner.run(5);
runner.assertAllFlowFilesTransferred(ControlRate.REL_SUCCESS, 1);
runner.assertQueueEmpty();
}
@Test
- public void testFileCountWithGrouping() throws InterruptedException {
- final TestRunner runner = TestRunners.newTestRunner(new ControlRate());
+ public void testFileCountWithGrouping() {
runner.setProperty(ControlRate.RATE_CONTROL_CRITERIA, ControlRate.FLOWFILE_RATE);
runner.setProperty(ControlRate.MAX_RATE, "2");
- runner.setProperty(ControlRate.TIME_PERIOD, "1 sec");
+ runner.setProperty(ControlRate.TIME_PERIOD, ONE_SECOND_TIME_PERIOD);
runner.setProperty(ControlRate.GROUPING_ATTRIBUTE_NAME, "group");
createFlowFileWithGroup(runner, "one");
@@ -118,18 +129,17 @@ public class TestControlRate {
runner.assertQueueNotEmpty();
// we have sent 2 files per group and after 1 second, we should be able to send the remaining 1 file per group
- Thread.sleep(ONE_SEC_PLUS);
+ incrementCurrentTime();
runner.run(2);
runner.assertAllFlowFilesTransferred(ControlRate.REL_SUCCESS, 2);
runner.assertQueueEmpty();
}
@Test
- public void testDataSizeRate() throws InterruptedException {
- final TestRunner runner = TestRunners.newTestRunner(new ControlRate());
+ public void testDataSizeRate() {
runner.setProperty(ControlRate.RATE_CONTROL_CRITERIA, ControlRate.DATA_RATE);
runner.setProperty(ControlRate.MAX_RATE, "20 b");
- runner.setProperty(ControlRate.TIME_PERIOD, "1 sec");
+ runner.setProperty(ControlRate.TIME_PERIOD, ONE_SECOND_TIME_PERIOD);
runner.enqueue("testdata 1");
runner.enqueue("testdata 2");
@@ -147,19 +157,18 @@ public class TestControlRate {
runner.assertQueueNotEmpty();
// we have sent 20 bytes and after 1 second, we should be able to send 20 more
- Thread.sleep(ONE_SEC_PLUS);
+ incrementCurrentTime();
runner.run(2, false);
runner.assertAllFlowFilesTransferred(ControlRate.REL_SUCCESS, 2);
runner.assertQueueEmpty();
}
@Test
- public void testViaAttribute() throws InterruptedException {
- final TestRunner runner = TestRunners.newTestRunner(new ControlRate());
+ public void testViaAttribute() {
runner.setProperty(ControlRate.RATE_CONTROL_CRITERIA, ControlRate.ATTRIBUTE_RATE);
runner.setProperty(ControlRate.RATE_CONTROL_ATTRIBUTE_NAME, "count");
runner.setProperty(ControlRate.MAX_RATE, "20000");
- runner.setProperty(ControlRate.TIME_PERIOD, "1 sec");
+ runner.setProperty(ControlRate.TIME_PERIOD, ONE_SECOND_TIME_PERIOD);
createFlowFile(runner, 1000);
createFlowFile(runner, 3000);
@@ -178,14 +187,14 @@ public class TestControlRate {
runner.assertTransferCount(ControlRate.REL_SUCCESS, 0);
runner.assertTransferCount(ControlRate.REL_FAILURE, 0);
runner.assertQueueNotEmpty();
- Thread.sleep(1200L);
+ incrementCurrentTime(1450);
// at this point, more than TIME_PERIOD 1.0 seconds but less than 1.45 seconds have passed
runner.run(50, false);
runner.assertTransferCount(ControlRate.REL_SUCCESS, 0);
runner.assertTransferCount(ControlRate.REL_FAILURE, 0);
runner.assertQueueNotEmpty();
- Thread.sleep(600L);
+ incrementCurrentTime(600);
// at this point, more than 1.45 seconds have passed, so we should be able to send another 20,000
runner.run();
@@ -195,12 +204,11 @@ public class TestControlRate {
}
@Test
- public void testAttributeDoesNotExist() throws InterruptedException {
- final TestRunner runner = TestRunners.newTestRunner(new ControlRate());
+ public void testAttributeDoesNotExist() {
runner.setProperty(ControlRate.RATE_CONTROL_CRITERIA, ControlRate.ATTRIBUTE_RATE);
runner.setProperty(ControlRate.RATE_CONTROL_ATTRIBUTE_NAME, "no.such.attribute");
runner.setProperty(ControlRate.MAX_RATE, "20000");
- runner.setProperty(ControlRate.TIME_PERIOD, "1 sec");
+ runner.setProperty(ControlRate.TIME_PERIOD, ONE_SECOND_TIME_PERIOD);
createFlowFile(runner, 1000);
createFlowFile(runner, 3000);
@@ -218,11 +226,10 @@ public class TestControlRate {
@Test
public void testBadAttributeRate() {
- final TestRunner runner = TestRunners.newTestRunner(new ControlRate());
runner.setProperty(ControlRate.RATE_CONTROL_CRITERIA, ControlRate.ATTRIBUTE_RATE);
runner.setProperty(ControlRate.RATE_CONTROL_ATTRIBUTE_NAME, "count");
runner.setProperty(ControlRate.MAX_RATE, "20000");
- runner.setProperty(ControlRate.TIME_PERIOD, "1 sec");
+ runner.setProperty(ControlRate.TIME_PERIOD, ONE_SECOND_TIME_PERIOD);
final Map<String, String> attributeMap = new HashMap<>();
attributeMap.put("count", "bad string");
@@ -236,10 +243,9 @@ public class TestControlRate {
@Test
public void testBatchLimit() {
- final TestRunner runner = TestRunners.newTestRunner(new ControlRate());
runner.setProperty(ControlRate.RATE_CONTROL_CRITERIA, ControlRate.FLOWFILE_RATE);
runner.setProperty(ControlRate.MAX_RATE, "5555");
- runner.setProperty(ControlRate.TIME_PERIOD, "1 sec");
+ runner.setProperty(ControlRate.TIME_PERIOD, ONE_SECOND_TIME_PERIOD);
final int TEST_FILE_COUNT = 1500;
@@ -265,10 +271,9 @@ public class TestControlRate {
@Test
public void testNonExistingGroupAttribute() {
- final TestRunner runner = TestRunners.newTestRunner(new ControlRate());
runner.setProperty(ControlRate.RATE_CONTROL_CRITERIA, ControlRate.FLOWFILE_RATE);
runner.setProperty(ControlRate.MAX_RATE, "2");
- runner.setProperty(ControlRate.TIME_PERIOD, "1 sec");
+ runner.setProperty(ControlRate.TIME_PERIOD, ONE_SECOND_TIME_PERIOD);
runner.setProperty(ControlRate.GROUPING_ATTRIBUTE_NAME, "group");
createFlowFileWithGroup(runner, "one");
@@ -283,11 +288,10 @@ public class TestControlRate {
}
@Test
- public void testIncreaseDataRate() throws InterruptedException {
- final TestRunner runner = TestRunners.newTestRunner(new ControlRate());
+ public void testIncreaseDataRate() {
runner.setProperty(ControlRate.RATE_CONTROL_CRITERIA, ControlRate.DATA_RATE);
runner.setProperty(ControlRate.MAX_RATE, "11 B");
- runner.setProperty(ControlRate.TIME_PERIOD, "1 sec");
+ runner.setProperty(ControlRate.TIME_PERIOD, ONE_SECOND_TIME_PERIOD);
runner.enqueue("test data 1");
runner.enqueue("test data 2");
@@ -310,7 +314,7 @@ public class TestControlRate {
runner.assertQueueNotEmpty();
// after 1 second, we should be able to send the up to 3 more flowfiles
- Thread.sleep(ONE_SEC_PLUS);
+ incrementCurrentTime();
runner.run(7, false);
runner.assertTransferCount(ControlRate.REL_SUCCESS, 6);
runner.assertTransferCount(ControlRate.REL_FAILURE, 0);
@@ -318,11 +322,10 @@ public class TestControlRate {
}
@Test
- public void testIncreaseFlowFileRate() throws InterruptedException {
- final TestRunner runner = TestRunners.newTestRunner(new ControlRate());
+ public void testIncreaseFlowFileRate() {
runner.setProperty(ControlRate.RATE_CONTROL_CRITERIA, ControlRate.FLOWFILE_RATE);
runner.setProperty(ControlRate.MAX_RATE, "1");
- runner.setProperty(ControlRate.TIME_PERIOD, "1 sec");
+ runner.setProperty(ControlRate.TIME_PERIOD, ONE_SECOND_TIME_PERIOD);
runner.enqueue("test data 1");
runner.enqueue("test data 2");
@@ -345,7 +348,7 @@ public class TestControlRate {
runner.assertQueueNotEmpty();
// after 1 second, we should be able to send the up to 3 more flowfiles
- Thread.sleep(ONE_SEC_PLUS);
+ incrementCurrentTime();
runner.run(7, false);
runner.assertTransferCount(ControlRate.REL_SUCCESS, 6);
runner.assertTransferCount(ControlRate.REL_FAILURE, 0);
@@ -353,10 +356,9 @@ public class TestControlRate {
}
@Test
- public void testDataOrFlowFileCountLimitedByBytes() throws InterruptedException {
- final TestRunner runner = TestRunners.newTestRunner(new ControlRate());
+ public void testDataOrFlowFileCountLimitedByBytes() {
runner.setProperty(ControlRate.RATE_CONTROL_CRITERIA, ControlRate.DATA_OR_FLOWFILE_RATE);
- runner.setProperty(ControlRate.TIME_PERIOD, "1 sec");
+ runner.setProperty(ControlRate.TIME_PERIOD, ONE_SECOND_TIME_PERIOD);
// Data rate will throttle before FlowFile count
runner.setProperty(ControlRate.MAX_DATA_RATE, "22 B");
runner.setProperty(ControlRate.MAX_COUNT_RATE, "3");
@@ -377,17 +379,16 @@ public class TestControlRate {
runner.assertTransferCount(ControlRate.REL_FAILURE, 0);
runner.assertQueueNotEmpty();
// we have sent 22 bytes and after 1 second, we should be able to send 22 more
- Thread.sleep(ONE_SEC_PLUS);
+ incrementCurrentTime(1500);
runner.run(4, false);
runner.assertAllFlowFilesTransferred(ControlRate.REL_SUCCESS, 1);
runner.assertQueueEmpty();
}
@Test
- public void testDataOrFlowFileCountLimitedByCount() throws InterruptedException {
- final TestRunner runner = TestRunners.newTestRunner(new ControlRate());
+ public void testDataOrFlowFileCountLimitedByCount() {
runner.setProperty(ControlRate.RATE_CONTROL_CRITERIA, ControlRate.DATA_OR_FLOWFILE_RATE);
- runner.setProperty(ControlRate.TIME_PERIOD, "1 sec");
+ runner.setProperty(ControlRate.TIME_PERIOD, ONE_SECOND_TIME_PERIOD);
// FlowFile count rate will throttle before data rate
runner.setProperty(ControlRate.MAX_DATA_RATE, "44 B"); // greater than all flowfiles to be queued
runner.setProperty(ControlRate.MAX_COUNT_RATE, "1"); // limit to 1 flowfile per second
@@ -396,32 +397,23 @@ public class TestControlRate {
runner.enqueue("test data 2");
runner.enqueue("test data 3");
- runner.run(4, false);
+ runner.run(1, false);
runner.assertTransferCount(ControlRate.REL_SUCCESS, 1);
runner.assertTransferCount(ControlRate.REL_FAILURE, 0);
runner.assertQueueNotEmpty();
- // we have sent 1 flowfile and after 1 second, we should be able to send 1 more
- Thread.sleep(ONE_SEC_PLUS);
- runner.run(4, false);
- runner.assertTransferCount(ControlRate.REL_SUCCESS, 2);
- runner.assertTransferCount(ControlRate.REL_FAILURE, 0);
- runner.assertQueueNotEmpty();
-
- // we have sent 2 flowfile and after 1 second, we should be able to send 1 more
- Thread.sleep(ONE_SEC_PLUS);
- runner.run(4, false);
- runner.assertAllFlowFilesTransferred(ControlRate.REL_SUCCESS, 3);
+ incrementCurrentTime(2000);
+ runner.run(1, false);
+ runner.assertTransferCount(ControlRate.REL_SUCCESS, 3);
runner.assertTransferCount(ControlRate.REL_FAILURE, 0);
runner.assertQueueEmpty();
}
@Test
- public void testDataOrFlowFileCountLimitedByBytesThenCount() throws InterruptedException {
- final TestRunner runner = TestRunners.newTestRunner(new ControlRate());
+ public void testDataOrFlowFileCountLimitedByBytesThenCount() {
runner.setProperty(ControlRate.RATE_CONTROL_CRITERIA, ControlRate.DATA_OR_FLOWFILE_RATE);
- runner.setProperty(ControlRate.TIME_PERIOD, "1 sec");
+ runner.setProperty(ControlRate.TIME_PERIOD, ONE_SECOND_TIME_PERIOD);
// Data rate will throttle before FlowFile count
runner.setProperty(ControlRate.MAX_DATA_RATE, "22 B");
runner.setProperty(ControlRate.MAX_COUNT_RATE, "5");
@@ -440,27 +432,17 @@ public class TestControlRate {
runner.assertTransferCount(ControlRate.REL_SUCCESS, 2);
runner.assertTransferCount(ControlRate.REL_FAILURE, 0);
runner.assertQueueNotEmpty();
- runner.clearTransferState();
// we have sent 2 flowfile and after 1 second, we should be able to send more, now limited by flowfile count
- Thread.sleep(ONE_SEC_PLUS);
- runner.run(10, false);
- runner.assertTransferCount(ControlRate.REL_SUCCESS, 5);
- runner.assertTransferCount(ControlRate.REL_FAILURE, 0);
- runner.assertQueueNotEmpty();
- runner.clearTransferState();
-
- // after 1 second, we should be able to send the remaining flowfile
- Thread.sleep(ONE_SEC_PLUS);
- runner.run(10, false);
- runner.assertAllFlowFilesTransferred(ControlRate.REL_SUCCESS, 1);
+ incrementCurrentTime(1500);
+ runner.run(1, false);
+ runner.assertTransferCount(ControlRate.REL_SUCCESS, 8);
runner.assertTransferCount(ControlRate.REL_FAILURE, 0);
runner.assertQueueEmpty();
}
@Test
public void testValidate() {
- final TestRunner runner = TestRunners.newTestRunner(new ControlRate());
runner.setProperty(ControlRate.RATE_CONTROL_CRITERIA, ControlRate.DATA_RATE);
runner.assertNotValid(); // MAX_RATE is not set
runner.setProperty(ControlRate.MAX_RATE, "1");
@@ -527,4 +509,22 @@ public class TestControlRate {
attributeMap.put("group", group);
runner.enqueue(new byte[0], attributeMap);
}
+
+ private void incrementCurrentTime() {
+ controlRate.currentTimeMillis += CURRENT_TIME_INCREMENT;
+ }
+
+ private void incrementCurrentTime(final long milliseconds) {
+ controlRate.currentTimeMillis += milliseconds;
+ }
+
+ private static class ConfigurableControlRate extends ControlRate {
+
+ private long currentTimeMillis = System.currentTimeMillis();
+
+ @Override
+ protected long getCurrentTimeMillis() {
+ return currentTimeMillis;
+ }
+ }
}