You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by gr...@apache.org on 2022/11/22 18:14:42 UTC

[nifi] branch main updated: NIFI-10835 Improved performance of TestControlRate

This is an automated email from the ASF dual-hosted git repository.

greyp pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 7019e182b5 NIFI-10835 Improved performance of TestControlRate
7019e182b5 is described below

commit 7019e182b5b0e36cc46ad37456f435b2a84269d5
Author: exceptionfactory <ex...@apache.org>
AuthorDate: Wed Nov 16 16:03:53 2022 -0600

    NIFI-10835 Improved performance of TestControlRate
    
    - Added LongSupplier for TimedBuffer and ControlRate classes to support overriding System.currentTimeMillis()
    
    This closes #6671
    Signed-off-by: Paul Grey <gr...@apache.org>
---
 .../apache/nifi/util/timebuffer/TimedBuffer.java   |  21 ++-
 .../nifi/processors/standard/ControlRate.java      |  44 ++++---
 .../nifi/processors/standard/TestControlRate.java  | 144 ++++++++++-----------
 3 files changed, 116 insertions(+), 93 deletions(-)

diff --git a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/timebuffer/TimedBuffer.java b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/timebuffer/TimedBuffer.java
index dd8e5232c4..af241a525f 100644
--- a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/timebuffer/TimedBuffer.java
+++ b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/timebuffer/TimedBuffer.java
@@ -18,6 +18,7 @@ package org.apache.nifi.util.timebuffer;
 
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.LongSupplier;
 
 public class TimedBuffer<T> {
 
@@ -25,20 +26,26 @@ public class TimedBuffer<T> {
     private final EntitySum<T>[] bins;
     private final EntityAccess<T> entityAccess;
     private final TimeUnit binPrecision;
+    private final LongSupplier currentTimeSupplier;
 
-    @SuppressWarnings("unchecked")
     public TimedBuffer(final TimeUnit binPrecision, final int numBins, final EntityAccess<T> accessor) {
+        this(binPrecision, numBins, accessor, System::currentTimeMillis);
+    }
+
+    @SuppressWarnings("unchecked")
+    public TimedBuffer(final TimeUnit binPrecision, final int numBins, final EntityAccess<T> accessor, final LongSupplier currentTimeSupplier) {
         this.binPrecision = binPrecision;
         this.numBins = numBins + 1;
         this.bins = new EntitySum[this.numBins];
         for (int i = 0; i < this.numBins; i++) {
-            this.bins[i] = new EntitySum<>(binPrecision, numBins, accessor);
+            this.bins[i] = new EntitySum<>(binPrecision, numBins, accessor, currentTimeSupplier);
         }
         this.entityAccess = accessor;
+        this.currentTimeSupplier = currentTimeSupplier;
     }
 
     public T add(final T entity) {
-        final int binIdx = (int) (binPrecision.convert(System.currentTimeMillis(), TimeUnit.MILLISECONDS) % numBins);
+        final int binIdx = (int) (binPrecision.convert(currentTimeSupplier.getAsLong(), TimeUnit.MILLISECONDS) % numBins);
         final EntitySum<T> sum = bins[binIdx];
 
         return sum.addOrReset(entity);
@@ -66,11 +73,13 @@ public class TimedBuffer<T> {
         private final AtomicReference<S> ref = new AtomicReference<>();
         private final TimeUnit binPrecision;
         private final int numConfiguredBins;
+        private final LongSupplier currentTimeSupplier;
 
-        public EntitySum(final TimeUnit binPrecision, final int numConfiguredBins, final EntityAccess<S> aggregator) {
+        public EntitySum(final TimeUnit binPrecision, final int numConfiguredBins, final EntityAccess<S> aggregator, final LongSupplier currentTimeSupplier) {
             this.binPrecision = binPrecision;
             this.entityAccess = aggregator;
             this.numConfiguredBins = numConfiguredBins;
+            this.currentTimeSupplier = currentTimeSupplier;
         }
 
         private S add(final S event) {
@@ -92,7 +101,7 @@ public class TimedBuffer<T> {
             // entityAccess.getTimestamp(curValue) represents the time at which the current value
             // was last updated. If the last value is less than current time - 1 binPrecision, then it
             // means that we've rolled over and need to reset the value.
-            final long maxExpectedTimePeriod = System.currentTimeMillis() - TimeUnit.MILLISECONDS.convert(numConfiguredBins, binPrecision);
+            final long maxExpectedTimePeriod = currentTimeSupplier.getAsLong() - TimeUnit.MILLISECONDS.convert(numConfiguredBins, binPrecision);
 
             final S curValue = ref.get();
             return (entityAccess.getTimestamp(curValue) < maxExpectedTimePeriod);
@@ -102,7 +111,7 @@ public class TimedBuffer<T> {
             // entityAccess.getTimestamp(curValue) represents the time at which the current value
             // was last updated. If the last value is less than current time - 1 binPrecision, then it
             // means that we've rolled over and need to reset the value.
-            final long maxExpectedTimePeriod = System.currentTimeMillis() - TimeUnit.MILLISECONDS.convert(1, binPrecision);
+            final long maxExpectedTimePeriod = currentTimeSupplier.getAsLong() - TimeUnit.MILLISECONDS.convert(1, binPrecision);
 
             final S curValue = ref.get();
             if (entityAccess.getTimestamp(curValue) < maxExpectedTimePeriod) {
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ControlRate.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ControlRate.java
index 34b9a8144b..ef62eee94f 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ControlRate.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ControlRate.java
@@ -56,6 +56,7 @@ import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.LongSupplier;
 import java.util.regex.Pattern;
 
 @SideEffectFree
@@ -165,7 +166,7 @@ public class ControlRate extends AbstractProcessor {
 
     private final ConcurrentMap<String, Throttle> dataThrottleMap = new ConcurrentHashMap<>();
     private final ConcurrentMap<String, Throttle> countThrottleMap = new ConcurrentHashMap<>();
-    private final AtomicLong lastThrottleClearTime = new AtomicLong(System.currentTimeMillis());
+    private final AtomicLong lastThrottleClearTime = new AtomicLong(getCurrentTimeMillis());
     private volatile String rateControlCriteria = null;
     private volatile String rateControlAttribute = null;
     private volatile String maximumRateStr = null;
@@ -299,7 +300,7 @@ public class ControlRate extends AbstractProcessor {
 
     @Override
     public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
-        List<FlowFile> flowFiles = session.get(new ThrottleFilter(MAX_FLOW_FILES_PER_BATCH));
+        List<FlowFile> flowFiles = session.get(new ThrottleFilter(MAX_FLOW_FILES_PER_BATCH, this::getCurrentTimeMillis));
         if (flowFiles.isEmpty()) {
             context.yield();
             return;
@@ -307,9 +308,9 @@ public class ControlRate extends AbstractProcessor {
 
         // Periodically clear any Throttle that has not been used in more than 2 throttling periods
         final long lastClearTime = lastThrottleClearTime.get();
-        final long throttleExpirationMillis = System.currentTimeMillis() - 2 * context.getProperty(TIME_PERIOD).asTimePeriod(TimeUnit.MILLISECONDS);
+        final long throttleExpirationMillis = getCurrentTimeMillis() - 2 * context.getProperty(TIME_PERIOD).asTimePeriod(TimeUnit.MILLISECONDS);
         if (lastClearTime < throttleExpirationMillis) {
-            if (lastThrottleClearTime.compareAndSet(lastClearTime, System.currentTimeMillis())) {
+            if (lastThrottleClearTime.compareAndSet(lastClearTime, getCurrentTimeMillis())) {
                 final Set<Map.Entry<String, Throttle>> throttleSet = new HashSet<>();
                 if (dataThrottleRequired()) {
                     throttleSet.addAll(dataThrottleMap.entrySet());
@@ -337,16 +338,25 @@ public class ControlRate extends AbstractProcessor {
         final ComponentLog logger = getLogger();
         for (FlowFile flowFile : flowFiles) {
             // call this to capture potential error
-            if (!isAccrualPossible(flowFile)) {
-                logger.error("Routing {} to 'failure' due to missing or invalid attribute", flowFile);
-                session.transfer(flowFile, REL_FAILURE);
-            } else {
+            if (isAccrualPossible(flowFile)) {
                 logger.info("transferring {} to 'success'", flowFile);
                 session.transfer(flowFile, REL_SUCCESS);
+            } else {
+                logger.error("Routing {} to 'failure' due to missing or invalid attribute", flowFile);
+                session.transfer(flowFile, REL_FAILURE);
             }
         }
     }
 
+    /**
+     * Get current time in milliseconds
+     *
+     * @return Current time in milliseconds from System
+     */
+    protected long getCurrentTimeMillis() {
+        return System.currentTimeMillis();
+    }
+
     /*
      * Determine if the accrual amount is valid for the type of throttle being applied. For example, if throttling based on
      * flowfile attribute, the specified attribute must be present and must be a long integer.
@@ -404,15 +414,17 @@ public class ControlRate extends AbstractProcessor {
         private final long timePeriodMillis;
         private final TimedBuffer<TimestampedLong> timedBuffer;
         private final ComponentLog logger;
+        private final LongSupplier currentTimeSupplier;
 
         private volatile long penalizationPeriod = 0;
         private volatile long penalizationExpired = 0;
         private volatile long lastUpdateTime;
 
-        public Throttle(final int timePeriod, final TimeUnit unit, final ComponentLog logger) {
+        private Throttle(final int timePeriod, final TimeUnit unit, final ComponentLog logger, final LongSupplier currentTimeSupplier) {
             this.timePeriodMillis = TimeUnit.MILLISECONDS.convert(timePeriod, unit);
-            this.timedBuffer = new TimedBuffer<>(unit, timePeriod, new LongEntityAccess());
+            this.timedBuffer = new TimedBuffer<>(unit, timePeriod, new LongEntityAccess(), currentTimeSupplier);
             this.logger = logger;
+            this.currentTimeSupplier = currentTimeSupplier;
         }
 
         public void setMaxRate(final long maxRate) {
@@ -428,7 +440,7 @@ public class ControlRate extends AbstractProcessor {
             if (value < 0) {
                 return false;
             }
-            final long now = System.currentTimeMillis();
+            final long now = currentTimeSupplier.getAsLong();
             if (penalizationExpired > now) {
                 return false;
             }
@@ -478,10 +490,12 @@ public class ControlRate extends AbstractProcessor {
     private class ThrottleFilter implements FlowFileFilter {
 
         private final int flowFilesPerBatch;
+        private final LongSupplier currentTimeSupplier;
         private int flowFilesInBatch = 0;
 
-        ThrottleFilter(final int maxFFPerBatch) {
-            flowFilesPerBatch = maxFFPerBatch;
+        ThrottleFilter(final int maxFFPerBatch, final LongSupplier currentTimeSupplier) {
+            this.flowFilesPerBatch = maxFFPerBatch;
+            this.currentTimeSupplier = currentTimeSupplier;
         }
 
         @Override
@@ -505,7 +519,7 @@ public class ControlRate extends AbstractProcessor {
             boolean dataThrottlingActive = false;
             if (dataThrottleRequired()) {
                 if (dataThrottle == null) {
-                    dataThrottle = new Throttle(timePeriodSeconds, TimeUnit.SECONDS, getLogger());
+                    dataThrottle = new Throttle(timePeriodSeconds, TimeUnit.SECONDS, getLogger(), currentTimeSupplier);
                     dataThrottle.setMaxRate(DataUnit.parseDataSize(maximumRateStr, DataUnit.B).longValue());
                     dataThrottleMap.put(groupName, dataThrottle);
                 }
@@ -534,7 +548,7 @@ public class ControlRate extends AbstractProcessor {
             // continue processing count throttle only if required and if data throttle is not already limiting flowfiles
             if (countThrottleRequired() && !dataThrottlingActive) {
                 if (countThrottle == null) {
-                    countThrottle = new Throttle(timePeriodSeconds, TimeUnit.SECONDS, getLogger());
+                    countThrottle = new Throttle(timePeriodSeconds, TimeUnit.SECONDS, getLogger(), currentTimeSupplier);
                     countThrottle.setMaxRate(Long.parseLong(maximumCountRateStr));
                     countThrottleMap.put(groupName, countThrottle);
                 }
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestControlRate.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestControlRate.java
index 0b68022622..eb33a7c9df 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestControlRate.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestControlRate.java
@@ -27,18 +27,30 @@ import java.util.Map;
 import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
 public class TestControlRate {
 
-    private static final long ONE_SEC_PLUS = 1010L;
+    private static final String ONE_SECOND_TIME_PERIOD = "1 s";
+
+    private static final long CURRENT_TIME_INCREMENT = 1100;
+
+    private ConfigurableControlRate controlRate;
+
+    private TestRunner runner;
+
+    @BeforeEach
+    public void setRunner() {
+        controlRate = new ConfigurableControlRate();
+        runner = TestRunners.newTestRunner(controlRate);
+    }
 
     @Test
     public void testLimitExceededThenOtherLimitNotExceeded() {
         // If we have flowfiles queued that have different values for the "Rate Controlled Attribute"
         // and we encounter a FlowFile whose rate should be throttled, we should continue pulling other flowfiles
         // whose rate does not need to be throttled.
-        final TestRunner runner = TestRunners.newTestRunner(new ControlRate());
         runner.setProperty(ControlRate.RATE_CONTROL_CRITERIA, ControlRate.FLOWFILE_RATE);
         runner.setProperty(ControlRate.MAX_RATE, "3");
         runner.setProperty(ControlRate.TIME_PERIOD, "1 min");
@@ -64,11 +76,10 @@ public class TestControlRate {
     }
 
     @Test
-    public void testFileCountRate() throws InterruptedException {
-        final TestRunner runner = TestRunners.newTestRunner(new ControlRate());
+    public void testFileCountRate() {
         runner.setProperty(ControlRate.RATE_CONTROL_CRITERIA, ControlRate.FLOWFILE_RATE);
         runner.setProperty(ControlRate.MAX_RATE, "3");
-        runner.setProperty(ControlRate.TIME_PERIOD, "1 sec");
+        runner.setProperty(ControlRate.TIME_PERIOD, ONE_SECOND_TIME_PERIOD);
 
         runner.enqueue("test data 1");
         runner.enqueue("test data 2");
@@ -86,18 +97,18 @@ public class TestControlRate {
         runner.assertQueueNotEmpty();
 
         // we have sent 3 files and after 1 second, we should be able to send the 4th
-        Thread.sleep(ONE_SEC_PLUS);
-        runner.run();
+        incrementCurrentTime();
+
+        runner.run(5);
         runner.assertAllFlowFilesTransferred(ControlRate.REL_SUCCESS, 1);
         runner.assertQueueEmpty();
     }
 
     @Test
-    public void testFileCountWithGrouping() throws InterruptedException {
-        final TestRunner runner = TestRunners.newTestRunner(new ControlRate());
+    public void testFileCountWithGrouping() {
         runner.setProperty(ControlRate.RATE_CONTROL_CRITERIA, ControlRate.FLOWFILE_RATE);
         runner.setProperty(ControlRate.MAX_RATE, "2");
-        runner.setProperty(ControlRate.TIME_PERIOD, "1 sec");
+        runner.setProperty(ControlRate.TIME_PERIOD, ONE_SECOND_TIME_PERIOD);
         runner.setProperty(ControlRate.GROUPING_ATTRIBUTE_NAME, "group");
 
         createFlowFileWithGroup(runner, "one");
@@ -118,18 +129,17 @@ public class TestControlRate {
         runner.assertQueueNotEmpty();
 
         // we have sent 2 files per group and after 1 second, we should be able to send the remaining 1 file per group
-        Thread.sleep(ONE_SEC_PLUS);
+        incrementCurrentTime();
         runner.run(2);
         runner.assertAllFlowFilesTransferred(ControlRate.REL_SUCCESS, 2);
         runner.assertQueueEmpty();
     }
 
     @Test
-    public void testDataSizeRate() throws InterruptedException {
-        final TestRunner runner = TestRunners.newTestRunner(new ControlRate());
+    public void testDataSizeRate() {
         runner.setProperty(ControlRate.RATE_CONTROL_CRITERIA, ControlRate.DATA_RATE);
         runner.setProperty(ControlRate.MAX_RATE, "20 b");
-        runner.setProperty(ControlRate.TIME_PERIOD, "1 sec");
+        runner.setProperty(ControlRate.TIME_PERIOD, ONE_SECOND_TIME_PERIOD);
 
         runner.enqueue("testdata 1");
         runner.enqueue("testdata 2");
@@ -147,19 +157,18 @@ public class TestControlRate {
         runner.assertQueueNotEmpty();
 
         // we have sent 20 bytes and after 1 second, we should be able to send 20 more
-        Thread.sleep(ONE_SEC_PLUS);
+        incrementCurrentTime();
         runner.run(2, false);
         runner.assertAllFlowFilesTransferred(ControlRate.REL_SUCCESS, 2);
         runner.assertQueueEmpty();
     }
 
     @Test
-    public void testViaAttribute() throws InterruptedException {
-        final TestRunner runner = TestRunners.newTestRunner(new ControlRate());
+    public void testViaAttribute() {
         runner.setProperty(ControlRate.RATE_CONTROL_CRITERIA, ControlRate.ATTRIBUTE_RATE);
         runner.setProperty(ControlRate.RATE_CONTROL_ATTRIBUTE_NAME, "count");
         runner.setProperty(ControlRate.MAX_RATE, "20000");
-        runner.setProperty(ControlRate.TIME_PERIOD, "1 sec");
+        runner.setProperty(ControlRate.TIME_PERIOD, ONE_SECOND_TIME_PERIOD);
 
         createFlowFile(runner, 1000);
         createFlowFile(runner, 3000);
@@ -178,14 +187,14 @@ public class TestControlRate {
         runner.assertTransferCount(ControlRate.REL_SUCCESS, 0);
         runner.assertTransferCount(ControlRate.REL_FAILURE, 0);
         runner.assertQueueNotEmpty();
-        Thread.sleep(1200L);
+        incrementCurrentTime(1450);
 
         // at this point, more than TIME_PERIOD 1.0 seconds but less than 1.45 seconds have passed
         runner.run(50, false);
         runner.assertTransferCount(ControlRate.REL_SUCCESS, 0);
         runner.assertTransferCount(ControlRate.REL_FAILURE, 0);
         runner.assertQueueNotEmpty();
-        Thread.sleep(600L);
+        incrementCurrentTime(600);
 
         // at this point, more than 1.45 seconds have passed, so we should be able to send another 20,000
         runner.run();
@@ -195,12 +204,11 @@ public class TestControlRate {
     }
 
     @Test
-    public void testAttributeDoesNotExist() throws InterruptedException {
-        final TestRunner runner = TestRunners.newTestRunner(new ControlRate());
+    public void testAttributeDoesNotExist() {
         runner.setProperty(ControlRate.RATE_CONTROL_CRITERIA, ControlRate.ATTRIBUTE_RATE);
         runner.setProperty(ControlRate.RATE_CONTROL_ATTRIBUTE_NAME, "no.such.attribute");
         runner.setProperty(ControlRate.MAX_RATE, "20000");
-        runner.setProperty(ControlRate.TIME_PERIOD, "1 sec");
+        runner.setProperty(ControlRate.TIME_PERIOD, ONE_SECOND_TIME_PERIOD);
 
         createFlowFile(runner, 1000);
         createFlowFile(runner, 3000);
@@ -218,11 +226,10 @@ public class TestControlRate {
 
     @Test
     public void testBadAttributeRate() {
-        final TestRunner runner = TestRunners.newTestRunner(new ControlRate());
         runner.setProperty(ControlRate.RATE_CONTROL_CRITERIA, ControlRate.ATTRIBUTE_RATE);
         runner.setProperty(ControlRate.RATE_CONTROL_ATTRIBUTE_NAME, "count");
         runner.setProperty(ControlRate.MAX_RATE, "20000");
-        runner.setProperty(ControlRate.TIME_PERIOD, "1 sec");
+        runner.setProperty(ControlRate.TIME_PERIOD, ONE_SECOND_TIME_PERIOD);
 
         final Map<String, String> attributeMap = new HashMap<>();
         attributeMap.put("count", "bad string");
@@ -236,10 +243,9 @@ public class TestControlRate {
 
     @Test
     public void testBatchLimit() {
-        final TestRunner runner = TestRunners.newTestRunner(new ControlRate());
         runner.setProperty(ControlRate.RATE_CONTROL_CRITERIA, ControlRate.FLOWFILE_RATE);
         runner.setProperty(ControlRate.MAX_RATE, "5555");
-        runner.setProperty(ControlRate.TIME_PERIOD, "1 sec");
+        runner.setProperty(ControlRate.TIME_PERIOD, ONE_SECOND_TIME_PERIOD);
 
         final int TEST_FILE_COUNT = 1500;
 
@@ -265,10 +271,9 @@ public class TestControlRate {
 
     @Test
     public void testNonExistingGroupAttribute() {
-        final TestRunner runner = TestRunners.newTestRunner(new ControlRate());
         runner.setProperty(ControlRate.RATE_CONTROL_CRITERIA, ControlRate.FLOWFILE_RATE);
         runner.setProperty(ControlRate.MAX_RATE, "2");
-        runner.setProperty(ControlRate.TIME_PERIOD, "1 sec");
+        runner.setProperty(ControlRate.TIME_PERIOD, ONE_SECOND_TIME_PERIOD);
         runner.setProperty(ControlRate.GROUPING_ATTRIBUTE_NAME, "group");
 
         createFlowFileWithGroup(runner, "one");
@@ -283,11 +288,10 @@ public class TestControlRate {
     }
 
     @Test
-    public void testIncreaseDataRate() throws InterruptedException {
-        final TestRunner runner = TestRunners.newTestRunner(new ControlRate());
+    public void testIncreaseDataRate() {
         runner.setProperty(ControlRate.RATE_CONTROL_CRITERIA, ControlRate.DATA_RATE);
         runner.setProperty(ControlRate.MAX_RATE, "11 B");
-        runner.setProperty(ControlRate.TIME_PERIOD, "1 sec");
+        runner.setProperty(ControlRate.TIME_PERIOD, ONE_SECOND_TIME_PERIOD);
 
         runner.enqueue("test data 1");
         runner.enqueue("test data 2");
@@ -310,7 +314,7 @@ public class TestControlRate {
         runner.assertQueueNotEmpty();
 
         // after 1 second, we should be able to send the up to 3 more flowfiles
-        Thread.sleep(ONE_SEC_PLUS);
+        incrementCurrentTime();
         runner.run(7, false);
         runner.assertTransferCount(ControlRate.REL_SUCCESS, 6);
         runner.assertTransferCount(ControlRate.REL_FAILURE, 0);
@@ -318,11 +322,10 @@ public class TestControlRate {
     }
 
     @Test
-    public void testIncreaseFlowFileRate() throws InterruptedException {
-        final TestRunner runner = TestRunners.newTestRunner(new ControlRate());
+    public void testIncreaseFlowFileRate() {
         runner.setProperty(ControlRate.RATE_CONTROL_CRITERIA, ControlRate.FLOWFILE_RATE);
         runner.setProperty(ControlRate.MAX_RATE, "1");
-        runner.setProperty(ControlRate.TIME_PERIOD, "1 sec");
+        runner.setProperty(ControlRate.TIME_PERIOD, ONE_SECOND_TIME_PERIOD);
 
         runner.enqueue("test data 1");
         runner.enqueue("test data 2");
@@ -345,7 +348,7 @@ public class TestControlRate {
         runner.assertQueueNotEmpty();
 
         // after 1 second, we should be able to send the up to 3 more flowfiles
-        Thread.sleep(ONE_SEC_PLUS);
+        incrementCurrentTime();
         runner.run(7, false);
         runner.assertTransferCount(ControlRate.REL_SUCCESS, 6);
         runner.assertTransferCount(ControlRate.REL_FAILURE, 0);
@@ -353,10 +356,9 @@ public class TestControlRate {
     }
 
     @Test
-    public void testDataOrFlowFileCountLimitedByBytes() throws InterruptedException {
-        final TestRunner runner = TestRunners.newTestRunner(new ControlRate());
+    public void testDataOrFlowFileCountLimitedByBytes() {
         runner.setProperty(ControlRate.RATE_CONTROL_CRITERIA, ControlRate.DATA_OR_FLOWFILE_RATE);
-        runner.setProperty(ControlRate.TIME_PERIOD, "1 sec");
+        runner.setProperty(ControlRate.TIME_PERIOD, ONE_SECOND_TIME_PERIOD);
         // Data rate will throttle before FlowFile count
         runner.setProperty(ControlRate.MAX_DATA_RATE, "22 B");
         runner.setProperty(ControlRate.MAX_COUNT_RATE, "3");
@@ -377,17 +379,16 @@ public class TestControlRate {
         runner.assertTransferCount(ControlRate.REL_FAILURE, 0);
         runner.assertQueueNotEmpty();
         // we have sent 22 bytes and after 1 second, we should be able to send 22 more
-        Thread.sleep(ONE_SEC_PLUS);
+        incrementCurrentTime(1500);
         runner.run(4, false);
         runner.assertAllFlowFilesTransferred(ControlRate.REL_SUCCESS, 1);
         runner.assertQueueEmpty();
     }
 
     @Test
-    public void testDataOrFlowFileCountLimitedByCount() throws InterruptedException {
-        final TestRunner runner = TestRunners.newTestRunner(new ControlRate());
+    public void testDataOrFlowFileCountLimitedByCount() {
         runner.setProperty(ControlRate.RATE_CONTROL_CRITERIA, ControlRate.DATA_OR_FLOWFILE_RATE);
-        runner.setProperty(ControlRate.TIME_PERIOD, "1 sec");
+        runner.setProperty(ControlRate.TIME_PERIOD, ONE_SECOND_TIME_PERIOD);
         // FlowFile count rate will throttle before data rate
         runner.setProperty(ControlRate.MAX_DATA_RATE, "44 B"); // greater than all flowfiles to be queued
         runner.setProperty(ControlRate.MAX_COUNT_RATE, "1");  // limit to 1 flowfile per second
@@ -396,32 +397,23 @@ public class TestControlRate {
         runner.enqueue("test data 2");
         runner.enqueue("test data 3");
 
-        runner.run(4, false);
+        runner.run(1, false);
 
         runner.assertTransferCount(ControlRate.REL_SUCCESS, 1);
         runner.assertTransferCount(ControlRate.REL_FAILURE, 0);
         runner.assertQueueNotEmpty();
 
-        // we have sent 1 flowfile and after 1 second, we should be able to send 1 more
-        Thread.sleep(ONE_SEC_PLUS);
-        runner.run(4, false);
-        runner.assertTransferCount(ControlRate.REL_SUCCESS, 2);
-        runner.assertTransferCount(ControlRate.REL_FAILURE, 0);
-        runner.assertQueueNotEmpty();
-
-        // we have sent 2 flowfile and after 1 second, we should be able to send 1 more
-        Thread.sleep(ONE_SEC_PLUS);
-        runner.run(4, false);
-        runner.assertAllFlowFilesTransferred(ControlRate.REL_SUCCESS, 3);
+        incrementCurrentTime(2000);
+        runner.run(1, false);
+        runner.assertTransferCount(ControlRate.REL_SUCCESS, 3);
         runner.assertTransferCount(ControlRate.REL_FAILURE, 0);
         runner.assertQueueEmpty();
     }
 
     @Test
-    public void testDataOrFlowFileCountLimitedByBytesThenCount() throws InterruptedException {
-        final TestRunner runner = TestRunners.newTestRunner(new ControlRate());
+    public void testDataOrFlowFileCountLimitedByBytesThenCount() {
         runner.setProperty(ControlRate.RATE_CONTROL_CRITERIA, ControlRate.DATA_OR_FLOWFILE_RATE);
-        runner.setProperty(ControlRate.TIME_PERIOD, "1 sec");
+        runner.setProperty(ControlRate.TIME_PERIOD, ONE_SECOND_TIME_PERIOD);
         // Data rate will throttle before FlowFile count
         runner.setProperty(ControlRate.MAX_DATA_RATE, "22 B");
         runner.setProperty(ControlRate.MAX_COUNT_RATE, "5");
@@ -440,27 +432,17 @@ public class TestControlRate {
         runner.assertTransferCount(ControlRate.REL_SUCCESS, 2);
         runner.assertTransferCount(ControlRate.REL_FAILURE, 0);
         runner.assertQueueNotEmpty();
-        runner.clearTransferState();
 
         // we have sent 2 flowfile and after 1 second, we should be able to send more, now limited by flowfile count
-        Thread.sleep(ONE_SEC_PLUS);
-        runner.run(10, false);
-        runner.assertTransferCount(ControlRate.REL_SUCCESS, 5);
-        runner.assertTransferCount(ControlRate.REL_FAILURE, 0);
-        runner.assertQueueNotEmpty();
-        runner.clearTransferState();
-
-        // after 1 second, we should be able to send the remaining flowfile
-        Thread.sleep(ONE_SEC_PLUS);
-        runner.run(10, false);
-        runner.assertAllFlowFilesTransferred(ControlRate.REL_SUCCESS, 1);
+        incrementCurrentTime(1500);
+        runner.run(1, false);
+        runner.assertTransferCount(ControlRate.REL_SUCCESS, 8);
         runner.assertTransferCount(ControlRate.REL_FAILURE, 0);
         runner.assertQueueEmpty();
     }
 
     @Test
     public void testValidate() {
-        final TestRunner runner = TestRunners.newTestRunner(new ControlRate());
         runner.setProperty(ControlRate.RATE_CONTROL_CRITERIA, ControlRate.DATA_RATE);
         runner.assertNotValid(); // MAX_RATE is not set
         runner.setProperty(ControlRate.MAX_RATE, "1");
@@ -527,4 +509,22 @@ public class TestControlRate {
         attributeMap.put("group", group);
         runner.enqueue(new byte[0], attributeMap);
     }
+
+    private void incrementCurrentTime() {
+        controlRate.currentTimeMillis += CURRENT_TIME_INCREMENT;
+    }
+
+    private void incrementCurrentTime(final long milliseconds) {
+        controlRate.currentTimeMillis += milliseconds;
+    }
+
+    private static class ConfigurableControlRate extends ControlRate {
+
+        private long currentTimeMillis = System.currentTimeMillis();
+
+        @Override
+        protected long getCurrentTimeMillis() {
+            return currentTimeMillis;
+        }
+    }
 }