You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ai...@apache.org on 2019/08/27 16:43:14 UTC

[nifi] 20/23: NIFI-6510 Adjusted interval and incorporated R-squared check

This is an automated email from the ASF dual-hosted git repository.

aichrist pushed a commit to branch analytics-framework
in repository https://gitbox.apache.org/repos/asf/nifi.git

commit f093f48bdf7f9d37269f00010764f898b1d7e1fe
Author: Yolanda Davis <yo...@gmail.com>
AuthorDate: Thu Aug 8 19:54:46 2019 -0400

    NIFI-6510 Adjusted interval and incorporated R-squared check
    
    Updates to support multiple variables for features, clearing cached regression model based on r-squared values
    
    Added ordinary least squares model, which truly uses multivariable regression. Refactor of interfaces to include more general interface for variate models (that include scoring support).
    
    Ratcheck fixes
    
    Added test for SimpleRegression. Minor fix for OLS model
    
    fixed test errors
    
    fixed checkstyle errors
    
    (cherry picked from commit fab411b)
---
 .../status/analytics/StatusAnalyticsModel.java     |   4 +-
 .../org/apache/nifi/controller/FlowController.java |   4 +-
 .../CachingConnectionStatusAnalyticsEngine.java    |  17 +-
 .../analytics/ConnectionStatusAnalytics.java       | 190 ++++++++++++++++-----
 .../analytics/ConnectionStatusAnalyticsEngine.java |  12 +-
 .../BivariateStatusAnalyticsModel.java             |  22 ++-
 .../MultivariateStatusAnalyticsModel.java}         |  28 ++-
 .../analytics/models/OrdinaryLeastSquaresMSAM.java | 103 +++++++++++
 .../{ => models}/SimpleRegressionBSAM.java         |  56 ++++--
 .../models/VariateStatusAnalyticsModel.java        |  18 +-
 ...TestCachingConnectionStatusAnalyticsEngine.java |   7 +-
 .../analytics/TestConnectionStatusAnalytics.java   |  41 ++++-
 .../TestConnectionStatusAnalyticsEngine.java       |   5 +-
 .../analytics/TestStatusAnalyticsEngine.java       |   6 +-
 .../models/TestOrdinaryLeastSqauresMSAM.java       | 159 +++++++++++++++++
 .../analytics/models/TestSimpleRegressionBSAM.java | 102 +++++++++++
 16 files changed, 666 insertions(+), 108 deletions(-)

diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalyticsModel.java b/nifi-api/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalyticsModel.java
index 72c81b1..4869a0f 100644
--- a/nifi-api/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalyticsModel.java
+++ b/nifi-api/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalyticsModel.java
@@ -20,8 +20,8 @@ import java.util.stream.Stream;
 
 public interface StatusAnalyticsModel {
 
-    void learn(Stream<Double> features, Stream<Double> labels);
-    Double predict(Double feature);
+    void learn(Stream<Double[]> features, Stream<Double> labels);
+    Double predict(Double[] feature);
     Boolean supportsOnlineLearning();
 
 }
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index 33ad7f7..bf1d06c 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -55,7 +55,6 @@ import org.apache.nifi.annotation.lifecycle.OnConfigurationRestored;
 import org.apache.nifi.annotation.lifecycle.OnShutdown;
 import org.apache.nifi.annotation.notification.OnPrimaryNodeStateChange;
 import org.apache.nifi.annotation.notification.PrimaryNodeState;
-import org.apache.nifi.parameter.ParameterLookup;
 import org.apache.nifi.authorization.Authorizer;
 import org.apache.nifi.authorization.Resource;
 import org.apache.nifi.authorization.resource.Authorizable;
@@ -174,6 +173,7 @@ import org.apache.nifi.nar.ExtensionManager;
 import org.apache.nifi.nar.NarCloseable;
 import org.apache.nifi.nar.NarThreadContextClassLoader;
 import org.apache.nifi.parameter.ParameterContextManager;
+import org.apache.nifi.parameter.ParameterLookup;
 import org.apache.nifi.parameter.StandardParameterContextManager;
 import org.apache.nifi.processor.Processor;
 import org.apache.nifi.processor.Relationship;
@@ -594,7 +594,7 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node
         }
 
         componentStatusRepository = createComponentStatusRepository();
-        analyticsEngine = new ConnectionStatusAnalyticsEngine(flowManager, componentStatusRepository);
+        analyticsEngine = new ConnectionStatusAnalyticsEngine(flowManager, componentStatusRepository,flowFileEventRepository);
         eventAccess = new StandardEventAccess(this, flowFileEventRepository);
 
         timerDrivenEngineRef.get().scheduleWithFixedDelay(new Runnable() {
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/CachingConnectionStatusAnalyticsEngine.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/CachingConnectionStatusAnalyticsEngine.java
index f69ed33..747c496 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/CachingConnectionStatusAnalyticsEngine.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/CachingConnectionStatusAnalyticsEngine.java
@@ -19,6 +19,7 @@ package org.apache.nifi.controller.status.analytics;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.nifi.controller.flow.FlowManager;
+import org.apache.nifi.controller.repository.FlowFileEventRepository;
 import org.apache.nifi.controller.status.history.ComponentStatusRepository;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -27,14 +28,16 @@ import com.github.benmanes.caffeine.cache.Cache;
 import com.github.benmanes.caffeine.cache.Caffeine;
 
 public class CachingConnectionStatusAnalyticsEngine implements StatusAnalyticsEngine {
-    private ComponentStatusRepository statusRepository;
-    private FlowManager flowManager;
+    private final ComponentStatusRepository statusRepository;
+    private final FlowManager flowManager;
+    private final FlowFileEventRepository flowFileEventRepository;
     private volatile Cache<String, ConnectionStatusAnalytics> cache;
     private static final Logger LOG = LoggerFactory.getLogger(CachingConnectionStatusAnalyticsEngine.class);
 
-    public CachingConnectionStatusAnalyticsEngine(FlowManager flowManager, ComponentStatusRepository statusRepository) {
+    public CachingConnectionStatusAnalyticsEngine(FlowManager flowManager, ComponentStatusRepository statusRepository, FlowFileEventRepository flowFileEventRepository) {
         this.flowManager = flowManager;
         this.statusRepository = statusRepository;
+        this.flowFileEventRepository = flowFileEventRepository;
         this.cache = Caffeine.newBuilder()
                 .expireAfterWrite(30, TimeUnit.MINUTES)
                 .build();
@@ -44,12 +47,12 @@ public class CachingConnectionStatusAnalyticsEngine implements StatusAnalyticsEn
     public StatusAnalytics getStatusAnalytics(String identifier) {
 
         ConnectionStatusAnalytics connectionStatusAnalytics = cache.getIfPresent(identifier);
-        if(connectionStatusAnalytics == null){
+        if (connectionStatusAnalytics == null) {
             LOG.debug("Creating new status analytics object for connection id: {}", identifier);
-            connectionStatusAnalytics = new ConnectionStatusAnalytics(statusRepository,flowManager,identifier,true);
+            connectionStatusAnalytics = new ConnectionStatusAnalytics(statusRepository, flowManager,flowFileEventRepository, identifier, true);
             connectionStatusAnalytics.init();
-            cache.put(identifier,connectionStatusAnalytics);
-        }else{
+            cache.put(identifier, connectionStatusAnalytics);
+        } else {
             LOG.debug("Pulled existing analytics from cache for connection id: {}", identifier);
             connectionStatusAnalytics.refresh();
         }
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/ConnectionStatusAnalytics.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/ConnectionStatusAnalytics.java
index 0cb6b5d..aa19b1d 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/ConnectionStatusAnalytics.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/ConnectionStatusAnalytics.java
@@ -22,10 +22,18 @@ import java.util.List;
 import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.Optional;
+import java.util.Random;
 import java.util.stream.Stream;
 
+import org.apache.commons.lang3.ArrayUtils;
 import org.apache.nifi.connectable.Connection;
 import org.apache.nifi.controller.flow.FlowManager;
+import org.apache.nifi.controller.repository.FlowFileEvent;
+import org.apache.nifi.controller.repository.FlowFileEventRepository;
+import org.apache.nifi.controller.repository.RepositoryStatusReport;
+import org.apache.nifi.controller.status.analytics.models.MultivariateStatusAnalyticsModel;
+import org.apache.nifi.controller.status.analytics.models.OrdinaryLeastSquaresMSAM;
+import org.apache.nifi.controller.status.analytics.models.VariateStatusAnalyticsModel;
 import org.apache.nifi.controller.status.history.ComponentStatusRepository;
 import org.apache.nifi.controller.status.history.ConnectionStatusDescriptor;
 import org.apache.nifi.controller.status.history.StatusHistory;
@@ -38,19 +46,26 @@ import org.apache.nifi.web.api.dto.status.StatusSnapshotDTO;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.primitives.Doubles;
+
 public class ConnectionStatusAnalytics implements StatusAnalytics {
 
     private static final Logger LOG = LoggerFactory.getLogger(ConnectionStatusAnalytics.class);
     private Map<String, Tuple<StatusAnalyticsModel, ExtractFunction>> modelMap;
     private QueryWindow queryWindow;
     private final ComponentStatusRepository componentStatusRepository;
+    private final FlowFileEventRepository flowFileEventRepository;
     private final String connectionIdentifier;
     private final FlowManager flowManager;
     private final Boolean supportOnlineLearning;
+    private Boolean extendWindow = false;
+    private static double SCORE_THRESHOLD = .90;
 
-    public ConnectionStatusAnalytics(ComponentStatusRepository componentStatusRepository, FlowManager flowManager, String connectionIdentifier, Boolean supportOnlineLearning) {
+    public ConnectionStatusAnalytics(ComponentStatusRepository componentStatusRepository, FlowManager flowManager, FlowFileEventRepository flowFileEventRepository, String connectionIdentifier,
+                                     Boolean supportOnlineLearning) {
         this.componentStatusRepository = componentStatusRepository;
         this.flowManager = flowManager;
+        this.flowFileEventRepository = flowFileEventRepository;
         this.connectionIdentifier = connectionIdentifier;
         this.supportOnlineLearning = supportOnlineLearning;
     }
@@ -60,8 +75,8 @@ public class ConnectionStatusAnalytics implements StatusAnalytics {
         LOG.debug("Initialize analytics connection id: {} ", connectionIdentifier);
 
         if (this.modelMap == null || this.modelMap.isEmpty()) {
-            Tuple<StatusAnalyticsModel, ExtractFunction> countModelFunction = new Tuple<>(new SimpleRegressionBSAM(!supportsOnlineLearning()), extract);
-            Tuple<StatusAnalyticsModel, ExtractFunction> byteModelFunction = new Tuple<>(new SimpleRegressionBSAM(!supportsOnlineLearning()), extract);
+            Tuple<StatusAnalyticsModel, ExtractFunction> countModelFunction = new Tuple<>(new OrdinaryLeastSquaresMSAM(), extract);
+            Tuple<StatusAnalyticsModel, ExtractFunction> byteModelFunction = new Tuple<>(new OrdinaryLeastSquaresMSAM(), extract);
             this.modelMap = new HashMap<>();
             //TODO: Should change keys used here
             this.modelMap.put(ConnectionStatusDescriptor.QUEUED_COUNT.getField(), countModelFunction);
@@ -73,38 +88,39 @@ public class ConnectionStatusAnalytics implements StatusAnalytics {
 
     public void refresh() {
 
-        LOG.debug("Refreshing model with new data for connection id: {} ", connectionIdentifier);
         if (this.queryWindow == null) {
+            //Set query window to fresh value
             this.queryWindow = new QueryWindow(System.currentTimeMillis() - getIntervalTimeMillis(), System.currentTimeMillis());
-        } else if (supportsOnlineLearning()) {
-            this.queryWindow = new QueryWindow(queryWindow.getEndTimeMillis(), System.currentTimeMillis());
+        } else if (supportOnlineLearning) {
+            //Obtain latest observations when available, extend window if needed to obtain minimum observations
+            this.queryWindow = new QueryWindow(extendWindow ? queryWindow.getStartTimeMillis() : queryWindow.getEndTimeMillis(), System.currentTimeMillis());
+
         }
         modelMap.forEach((metric, modelFunction) -> {
 
             StatusAnalyticsModel model = modelFunction.getKey();
             ExtractFunction extract = modelFunction.getValue();
             StatusHistory statusHistory = componentStatusRepository.getConnectionStatusHistory(connectionIdentifier, queryWindow.getStartDateTime(), queryWindow.getEndDateTime(), Integer.MAX_VALUE);
-            Tuple<Stream<Double>, Stream<Double>> modelData = extract.extractMetric(metric, statusHistory);
-            Stream<Double> times = modelData.getKey();
-            Stream<Double> values = modelData.getValue();
-            //times are the X axis and values are on the y axis
-            model.learn(times, values);
+            Tuple<Stream<Double[]>, Stream<Double>> modelData = extract.extractMetric(metric, statusHistory);
+            Double[][] features = modelData.getKey().toArray(size -> new Double[size][1]);
+            Double[] values = modelData.getValue().toArray(size -> new Double[size]);
+
+            if (ArrayUtils.isNotEmpty(features)) {
+                try {
+                    LOG.debug("Refreshing model with new data for connection id: {} ", connectionIdentifier);
+                    model.learn(Stream.of(features), Stream.of(values));
+                    extendWindow = false;
+                } catch (Exception ex) {
+                    LOG.debug("Exception encountered while training model for connection id {}: {}", connectionIdentifier, ex.getMessage());
+                    extendWindow = true;
+                }
+            } else {
+                extendWindow = true;
+            }
 
         });
     }
 
-    protected Long getTimePrediction(Double prediction, Long timeMillis) {
-
-        if (Double.isNaN(prediction) || Double.isInfinite(prediction)) {
-            return -1L;
-        } else if (prediction < timeMillis) {
-            return 0L;
-        } else {
-            return Math.max(0, Math.round(prediction) - timeMillis);
-        }
-
-    }
-
     /**
      * Returns the predicted time (in milliseconds) when backpressure is expected to be applied to this connection, based on the total number of bytes in the queue.
      *
@@ -112,16 +128,24 @@ public class ConnectionStatusAnalytics implements StatusAnalytics {
      */
     public Long getTimeToBytesBackpressureMillis() {
 
-        final BivariateStatusAnalyticsModel bytesModel = (BivariateStatusAnalyticsModel) modelMap.get(ConnectionStatusDescriptor.QUEUED_BYTES.getField()).getKey();
+        final MultivariateStatusAnalyticsModel bytesModel = (MultivariateStatusAnalyticsModel) modelMap.get(ConnectionStatusDescriptor.QUEUED_BYTES.getField()).getKey();
+        FlowFileEvent flowFileEvent = getStatusReport();
+
         final Connection connection = getConnection();
         if (connection == null) {
             throw new NoSuchElementException("Connection with the following id cannot be found:" + connectionIdentifier + ". Model should be invalidated!");
         }
         final String backPressureDataSize = connection.getFlowFileQueue().getBackPressureDataSizeThreshold();
         final double backPressureBytes = DataUnit.parseDataSize(backPressureDataSize, DataUnit.B);
-        final double prediction = bytesModel.predictX(backPressureBytes);
-        return getTimePrediction(prediction, System.currentTimeMillis());
 
+        if(validModel(bytesModel) && flowFileEvent != null) {
+            List<Tuple<Integer, Double>> predictFeatures = new ArrayList<>();
+            Double inOutRatio = (flowFileEvent.getContentSizeOut() / (double)flowFileEvent.getContentSizeIn());
+            predictFeatures.add(new Tuple<>(1, inOutRatio));
+            return getTimePrediction(bytesModel.predictVariable(0, predictFeatures, backPressureBytes), System.currentTimeMillis());
+        }else{
+            return -1L;
+        }
     }
 
     /**
@@ -131,14 +155,24 @@ public class ConnectionStatusAnalytics implements StatusAnalytics {
      */
     public Long getTimeToCountBackpressureMillis() {
 
-        final BivariateStatusAnalyticsModel countModel = (BivariateStatusAnalyticsModel) modelMap.get(ConnectionStatusDescriptor.QUEUED_COUNT.getField()).getKey();
+        final MultivariateStatusAnalyticsModel countModel = (MultivariateStatusAnalyticsModel) modelMap.get(ConnectionStatusDescriptor.QUEUED_COUNT.getField()).getKey();
+        FlowFileEvent flowFileEvent = getStatusReport();
+
         final Connection connection = getConnection();
         if (connection == null) {
             throw new NoSuchElementException("Connection with the following id cannot be found:" + connectionIdentifier + ". Model should be invalidated!");
         }
+
         final double backPressureCountThreshold = connection.getFlowFileQueue().getBackPressureObjectThreshold();
-        final double prediction = countModel.predictX(backPressureCountThreshold);
-        return getTimePrediction(prediction, System.currentTimeMillis());
+
+        if(validModel(countModel) && flowFileEvent != null) {
+            List<Tuple<Integer, Double>> predictFeatures = new ArrayList<>();
+            Double inOutRatio = (flowFileEvent.getFlowFilesOut() / (double)flowFileEvent.getFlowFilesIn());
+            predictFeatures.add(new Tuple<>(1, inOutRatio));
+            return getTimePrediction(countModel.predictVariable(0, predictFeatures, backPressureCountThreshold), System.currentTimeMillis());
+        }else{
+            return -1L;
+        }
     }
 
     /**
@@ -148,11 +182,17 @@ public class ConnectionStatusAnalytics implements StatusAnalytics {
      */
 
     public Long getNextIntervalBytes() {
-        final BivariateStatusAnalyticsModel bytesModel = (BivariateStatusAnalyticsModel) modelMap.get(ConnectionStatusDescriptor.QUEUED_BYTES.getField()).getKey();
-        final Double prediction = bytesModel.predictY((double) System.currentTimeMillis() + getIntervalTimeMillis());
-        if (!Double.isNaN(prediction) && prediction >= 0) {
-            return Math.round(prediction);
-        } else {
+        final VariateStatusAnalyticsModel bytesModel = (VariateStatusAnalyticsModel) modelMap.get(ConnectionStatusDescriptor.QUEUED_BYTES.getField()).getKey();
+        FlowFileEvent flowFileEvent = getStatusReport();
+
+        if(validModel(bytesModel) && flowFileEvent != null) {
+            List<Double> predictFeatures = new ArrayList<>();
+            Long nextInterval =  System.currentTimeMillis() + getIntervalTimeMillis();
+            Double inOutRatio = flowFileEvent.getContentSizeOut() / (double)flowFileEvent.getContentSizeIn();
+            predictFeatures.add(nextInterval.doubleValue());
+            predictFeatures.add(inOutRatio);
+            return  (bytesModel.predict(predictFeatures.toArray(new Double[2]))).longValue();
+        }else{
             return -1L;
         }
     }
@@ -164,13 +204,20 @@ public class ConnectionStatusAnalytics implements StatusAnalytics {
      */
 
     public Long getNextIntervalCount() {
-        final BivariateStatusAnalyticsModel countModel = (BivariateStatusAnalyticsModel) modelMap.get(ConnectionStatusDescriptor.QUEUED_COUNT.getField()).getKey();
-        final Double prediction = countModel.predictY((double) System.currentTimeMillis() + getIntervalTimeMillis());
-        if (!Double.isNaN(prediction) && prediction >= 0) {
-            return Math.round(prediction);
-        } else {
+        final VariateStatusAnalyticsModel countModel = (VariateStatusAnalyticsModel) modelMap.get(ConnectionStatusDescriptor.QUEUED_COUNT.getField()).getKey();
+        FlowFileEvent flowFileEvent = getStatusReport();
+
+        if(validModel(countModel) && flowFileEvent != null) {
+            List<Double> predictFeatures = new ArrayList<>();
+            Long nextInterval =  System.currentTimeMillis() + getIntervalTimeMillis();
+            Double inOutRatio = flowFileEvent.getFlowFilesOut()/ (double)flowFileEvent.getFlowFilesIn();
+            predictFeatures.add(nextInterval.doubleValue());
+            predictFeatures.add(inOutRatio);
+            return (countModel.predict(predictFeatures.toArray(new Double[2]))).longValue();
+        }else{
             return -1L;
         }
+
     }
 
     public Long getNextIntervalPercentageUseCount() {
@@ -183,7 +230,7 @@ public class ConnectionStatusAnalytics implements StatusAnalytics {
         final long nextIntervalCount = getNextIntervalCount();
 
         if (nextIntervalCount > -1L) {
-            return Math.round((getNextIntervalCount() / backPressureCountThreshold) * 100);
+            return Math.min(100, Math.round((nextIntervalCount / backPressureCountThreshold) * 100));
         } else {
             return -1L;
         }
@@ -201,14 +248,14 @@ public class ConnectionStatusAnalytics implements StatusAnalytics {
         final long nextIntervalBytes = getNextIntervalBytes();
 
         if (nextIntervalBytes > -1L) {
-            return Math.round((getNextIntervalBytes() / backPressureBytes) * 100);
+            return Math.min(100, Math.round((getNextIntervalBytes() / backPressureBytes) * 100));
         } else {
             return -1L;
         }
     }
 
     public Long getIntervalTimeMillis() {
-        return (5L * 60 * 1000);
+        return 3L * 60 * 1000;
     }
 
     @Override
@@ -249,24 +296,73 @@ public class ConnectionStatusAnalytics implements StatusAnalytics {
         return connection.orElse(null);
     }
 
+    private FlowFileEvent getStatusReport(){
+        RepositoryStatusReport statusReport =  flowFileEventRepository.reportTransferEvents(System.currentTimeMillis());
+        return statusReport.getReportEntry(this.connectionIdentifier);
+    }
+
     private interface ExtractFunction {
-        Tuple<Stream<Double>, Stream<Double>> extractMetric(String metric, StatusHistory statusHistory);
+        Tuple<Stream<Double[]>, Stream<Double>> extractMetric(String metric, StatusHistory statusHistory);
+    }
+
+    private Long getTimePrediction(Double prediction, Long timeMillis) {
+
+        if (Double.isNaN(prediction) || Double.isInfinite(prediction)) {
+            return -1L;
+        } else if (prediction < timeMillis) {
+            return 0L;
+        } else {
+            return Math.max(0, Math.round(prediction) - timeMillis);
+        }
+    }
+
+    private boolean validModel(VariateStatusAnalyticsModel model){
+
+        Double rSquared = model.getRSquared();
+
+        if (rSquared == null || (Doubles.isFinite(rSquared) && !Double.isNaN(rSquared) && rSquared < SCORE_THRESHOLD)) {
+            if(supportOnlineLearning && model.supportsOnlineLearning()){
+                model.clear();
+            }
+            return false;
+        }else {
+            return true;
+        }
     }
 
     private final ExtractFunction extract = (metric, statusHistory) -> {
 
         List<Double> values = new ArrayList<>();
-        List<Double> times = new ArrayList<>();
-
+        List<Double[]> features = new ArrayList<>();
+        Random rand = new Random();
         StatusHistoryDTO statusHistoryDTO = StatusHistoryUtil.createStatusHistoryDTO(statusHistory);
 
         for (StatusSnapshotDTO snap : statusHistoryDTO.getAggregateSnapshots()) {
+            List<Double> featureArray = new ArrayList<>();
             Long snapValue = snap.getStatusMetrics().get(metric);
             long snapTime = snap.getTimestamp().getTime();
+
+            featureArray.add((double) snapTime);
+            Double randomError = + (rand.nextInt(1000) * .0000001);
+            if (metric.equals(ConnectionStatusDescriptor.QUEUED_COUNT.getField())) {
+
+                Long inputCount = snap.getStatusMetrics().get(ConnectionStatusDescriptor.INPUT_COUNT.getField());
+                Long outputCount = snap.getStatusMetrics().get(ConnectionStatusDescriptor.OUTPUT_COUNT.getField());
+                Double inOutRatio = ((double)outputCount /(double)inputCount) + randomError;
+                featureArray.add(Double.isNaN(inOutRatio)? randomError : inOutRatio);
+
+            } else {
+                Long inputBytes = snap.getStatusMetrics().get(ConnectionStatusDescriptor.INPUT_BYTES.getField());
+                Long outputBytes = snap.getStatusMetrics().get(ConnectionStatusDescriptor.OUTPUT_BYTES.getField());
+                Double inOutRatio = ((double)outputBytes/(double)inputBytes) + randomError;
+                featureArray.add(Double.isNaN(inOutRatio)? randomError : inOutRatio);
+            }
+
             values.add((double) snapValue);
-            times.add((double) snapTime);
+            features.add(featureArray.toArray(new Double[featureArray.size()]));
+
         }
-        return new Tuple<>(times.stream(), values.stream());
+        return new Tuple<Stream<Double[]>, Stream<Double>>(features.stream(), values.stream());
 
     };
 
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/ConnectionStatusAnalyticsEngine.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/ConnectionStatusAnalyticsEngine.java
index 8f1222c..6f261a0 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/ConnectionStatusAnalyticsEngine.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/ConnectionStatusAnalyticsEngine.java
@@ -17,24 +17,26 @@
 package org.apache.nifi.controller.status.analytics;
 
 import org.apache.nifi.controller.flow.FlowManager;
+import org.apache.nifi.controller.repository.FlowFileEventRepository;
 import org.apache.nifi.controller.status.history.ComponentStatusRepository;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class ConnectionStatusAnalyticsEngine implements StatusAnalyticsEngine {
-    private ComponentStatusRepository statusRepository;
-    private FlowManager flowManager;
-
     private static final Logger LOG = LoggerFactory.getLogger(ConnectionStatusAnalyticsEngine.class);
+    private final ComponentStatusRepository statusRepository;
+    private final FlowManager flowManager;
+    private final FlowFileEventRepository flowFileEventRepository;
 
-    public ConnectionStatusAnalyticsEngine(FlowManager flowManager, ComponentStatusRepository statusRepository) {
+    public ConnectionStatusAnalyticsEngine(FlowManager flowManager, ComponentStatusRepository statusRepository, FlowFileEventRepository flowFileEventRepository) {
         this.flowManager = flowManager;
         this.statusRepository = statusRepository;
+        this.flowFileEventRepository = flowFileEventRepository;
     }
 
     @Override
     public StatusAnalytics getStatusAnalytics(String identifier) {
-        ConnectionStatusAnalytics connectionStatusAnalytics = new ConnectionStatusAnalytics(statusRepository,flowManager,identifier,false);
+        ConnectionStatusAnalytics connectionStatusAnalytics = new ConnectionStatusAnalytics(statusRepository, flowManager, flowFileEventRepository, identifier, false);
         connectionStatusAnalytics.init();
         return connectionStatusAnalytics;
     }
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/BivariateStatusAnalyticsModel.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/models/BivariateStatusAnalyticsModel.java
similarity index 68%
copy from nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/BivariateStatusAnalyticsModel.java
copy to nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/models/BivariateStatusAnalyticsModel.java
index eff661b..a1e2729 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/BivariateStatusAnalyticsModel.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/models/BivariateStatusAnalyticsModel.java
@@ -14,25 +14,33 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.nifi.controller.status.analytics;
+package org.apache.nifi.controller.status.analytics.models;
 
+import java.util.Map;
 import java.util.stream.Stream;
 
-public abstract class BivariateStatusAnalyticsModel implements StatusAnalyticsModel {
+public abstract class BivariateStatusAnalyticsModel implements VariateStatusAnalyticsModel {
 
+    public abstract void learn(Stream<Double[]> features, Stream<Double> labels);
 
-    public abstract void learn(Stream<Double> features, Stream<Double> labels);
+    public  Double predict(Double[] feature){
+        return predictY(feature[0]);
+    }
 
-    public abstract Double predict(Double feature);
+    public Boolean supportsOnlineLearning() {
+        return false;
+    }
 
     public abstract Double predictX(Double y);
 
     public abstract Double predictY(Double x);
 
+    public abstract Double getRSquared();
+
+    public abstract Map<String,Double> getScores();
+
     @Override
-    public Boolean supportsOnlineLearning() {
-        return false;
+    public void clear() {
     }
 
-
 }
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/BivariateStatusAnalyticsModel.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/models/MultivariateStatusAnalyticsModel.java
similarity index 58%
rename from nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/BivariateStatusAnalyticsModel.java
rename to nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/models/MultivariateStatusAnalyticsModel.java
index eff661b..f4a98ab 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/BivariateStatusAnalyticsModel.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/models/MultivariateStatusAnalyticsModel.java
@@ -14,25 +14,37 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.nifi.controller.status.analytics;
 
-import java.util.stream.Stream;
-
-public abstract class BivariateStatusAnalyticsModel implements StatusAnalyticsModel {
+package org.apache.nifi.controller.status.analytics.models;
 
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Stream;
 
-    public abstract void learn(Stream<Double> features, Stream<Double> labels);
+import org.apache.nifi.util.Tuple;
 
-    public abstract Double predict(Double feature);
+public abstract class MultivariateStatusAnalyticsModel implements VariateStatusAnalyticsModel {
 
-    public abstract Double predictX(Double y);
+    public abstract void learn(Stream<Double[]> features, Stream<Double> labels);
 
-    public abstract Double predictY(Double x);
+    public abstract Double predict(Double[] feature);
 
     @Override
     public Boolean supportsOnlineLearning() {
         return false;
     }
 
+    public abstract Double predictVariable(Integer variableIndex, List<Tuple<Integer,Double>> predictorVariablesWithIndex, Double label);
+
+    public abstract Double getRSquared();
+
+    public abstract Map<String,Double> getScores();
+
+    @Override
+    public void clear() {
+
+    }
 
 }
+
+
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/models/OrdinaryLeastSquaresMSAM.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/models/OrdinaryLeastSquaresMSAM.java
new file mode 100644
index 0000000..4340a8c
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/models/OrdinaryLeastSquaresMSAM.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.status.analytics.models;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.commons.math3.stat.regression.OLSMultipleLinearRegression;
+import org.apache.nifi.util.Tuple;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class OrdinaryLeastSquaresMSAM extends MultivariateStatusAnalyticsModel {
+
+    private static final Logger LOG = LoggerFactory.getLogger(OrdinaryLeastSquaresMSAM.class);
+    private OLSMultipleLinearRegression olsModel;
+    private double[] coefficients;
+
+    public OrdinaryLeastSquaresMSAM() {
+        this.olsModel = new OLSMultipleLinearRegression();
+    }
+
+    @Override
+    public void learn(Stream<Double[]> features, Stream<Double> labels) {
+        double[] labelArray = ArrayUtils.toPrimitive(labels.toArray(Double[]::new));
+        double[][] featuresMatrix = features.map(feature -> ArrayUtils.toPrimitive(feature)).toArray(double[][]::new);
+        this.olsModel.newSampleData(labelArray, featuresMatrix);
+        this.coefficients = olsModel.estimateRegressionParameters();
+    }
+
+    @Override
+    public Double predict(Double[] feature) {
+        if (coefficients != null) {
+            final double intercept = olsModel.isNoIntercept() ? 0 : coefficients[0];
+            double sumX = 0;
+
+            for (int i = 0; i < feature.length; i++) {
+                sumX += coefficients[i + 1] * feature[i];
+            }
+
+            return sumX + intercept;
+        } else {
+            return null;
+        }
+    }
+
+    @Override
+    public Double predictVariable(Integer variableIndex, List<Tuple<Integer, Double>> predictorVariablesWithIndex, Double label) {
+        if (coefficients != null) {
+            final double intercept = olsModel.isNoIntercept() ? 0 : coefficients[0];
+            final double predictorCoeff = coefficients[variableIndex + 1];
+            double sumX = 0;
+            if (predictorVariablesWithIndex.size() > 0) {
+                sumX = predictorVariablesWithIndex.stream().map(featureTuple -> coefficients[olsModel.isNoIntercept() ? featureTuple.getKey() : featureTuple.getKey() + 1] * featureTuple.getValue())
+                                                           .collect(Collectors.summingDouble(Double::doubleValue));
+            }
+            return (label - intercept - sumX) / predictorCoeff;
+        } else {
+            return null;
+        }
+    }
+
+    @Override
+    public Map<String, Double> getScores() {
+        if (coefficients != null) {
+            Map<String, Double> scores = new HashMap<>();
+            scores.put("rSquared", olsModel.calculateRSquared());
+            scores.put("totalSumOfSquares", olsModel.calculateTotalSumOfSquares());
+            return scores;
+        } else {
+            return null;
+        }
+    }
+
+    @Override
+    public Double getRSquared() {
+        if (coefficients != null) {
+            return olsModel.calculateRSquared();
+        } else {
+            return null;
+        }
+    }
+
+
+}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/SimpleRegressionBSAM.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/models/SimpleRegressionBSAM.java
similarity index 54%
rename from nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/SimpleRegressionBSAM.java
rename to nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/models/SimpleRegressionBSAM.java
index 21c0370..39ffce3 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/SimpleRegressionBSAM.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/models/SimpleRegressionBSAM.java
@@ -14,12 +14,15 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.nifi.controller.status.analytics;
+package org.apache.nifi.controller.status.analytics.models;
 
 
+import java.util.HashMap;
+import java.util.Map;
 import java.util.stream.Stream;
 
 import org.apache.commons.lang3.ArrayUtils;
+import org.apache.commons.math3.stat.regression.RegressionResults;
 import org.apache.commons.math3.stat.regression.SimpleRegression;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -29,6 +32,7 @@ public class SimpleRegressionBSAM extends BivariateStatusAnalyticsModel {
     private static final Logger LOG = LoggerFactory.getLogger(SimpleRegressionBSAM.class);
     private final SimpleRegression regression;
     private final Boolean clearObservationsOnLearn;
+    private RegressionResults results;
 
     public SimpleRegressionBSAM(Boolean clearObservationsOnLearn) {
 
@@ -37,25 +41,22 @@ public class SimpleRegressionBSAM extends BivariateStatusAnalyticsModel {
     }
 
     @Override
-    public void learn(Stream<Double> features, Stream<Double> labels) {
+    public void learn(Stream<Double[]> features, Stream<Double> labels) {
         double[] labelArray = ArrayUtils.toPrimitive(labels.toArray(Double[]::new));
-        double[][] featuresMatrix = features.map(feature -> new double[]{feature}).toArray(double[][]::new);
+        double[][] featuresMatrix = features.map(feature -> ArrayUtils.toPrimitive(feature)).toArray(double[][]::new);
 
-        if(clearObservationsOnLearn) {
+        if (clearObservationsOnLearn) {
             regression.clear();
         }
 
         regression.addObservations(featuresMatrix, labelArray);
-        LOG.debug("Model is using equation: y = {}x + {}", regression.getSlope(), regression.getIntercept());
+        results = regression.regress();
+        LOG.debug("Model is using equation: y = {}x + {}, with R-squared {}, RMSE {}", regression.getSlope(), regression.getIntercept(),
+                                                                                       results.getRSquared(), Math.sqrt(results.getMeanSquareError()));
 
     }
 
     @Override
-    public Double predict(Double feature) {
-        return predictY(feature);
-    }
-
-    @Override
     public Double predictX(Double y) {
         return (y - regression.getIntercept()) / regression.getSlope();
     }
@@ -64,4 +65,39 @@ public class SimpleRegressionBSAM extends BivariateStatusAnalyticsModel {
     public Double predictY(Double x) {
         return regression.getSlope() * x + regression.getIntercept();
     }
+
+    @Override
+    public Double getRSquared() {
+        if (results != null) {
+            return results.getRSquared();
+        } else {
+            return null;
+        }
+    }
+
+    @Override
+    public Map<String, Double> getScores() {
+        if(results == null){
+            return null;
+        }else{
+            Map<String,Double> scores = new HashMap<>();
+            scores.put("rSquared",results.getRSquared());
+            scores.put("adjustedRSquared",results.getAdjustedRSquared());
+            scores.put("residualSumSquares",results.getErrorSumSquares());
+            scores.put("meanSquareError",results.getMeanSquareError());
+            return scores;
+        }
+    }
+
+    @Override
+    public void clear() {
+        results = null;
+        regression.clear();
+    }
+
+    @Override
+    public Boolean supportsOnlineLearning() {
+        return true;
+    }
+
 }
diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalyticsModel.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/models/VariateStatusAnalyticsModel.java
similarity index 67%
copy from nifi-api/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalyticsModel.java
copy to nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/models/VariateStatusAnalyticsModel.java
index 72c81b1..44ce6b5 100644
--- a/nifi-api/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalyticsModel.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/models/VariateStatusAnalyticsModel.java
@@ -14,14 +14,24 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.nifi.controller.status.analytics;
+package org.apache.nifi.controller.status.analytics.models;
 
+import java.util.Map;
 import java.util.stream.Stream;
 
-public interface StatusAnalyticsModel {
+import org.apache.nifi.controller.status.analytics.StatusAnalyticsModel;
+
+public interface VariateStatusAnalyticsModel extends StatusAnalyticsModel {
+
+    void learn(Stream<Double[]> features, Stream<Double> labels);
+
+    Double predict(Double[] feature);
 
-    void learn(Stream<Double> features, Stream<Double> labels);
-    Double predict(Double feature);
     Boolean supportsOnlineLearning();
 
+    Double getRSquared();
+
+    Map<String,Double> getScores();
+
+    void clear();
 }
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/status/analytics/TestCachingConnectionStatusAnalyticsEngine.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/status/analytics/TestCachingConnectionStatusAnalyticsEngine.java
index 9fb67db..77ffa9b 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/status/analytics/TestCachingConnectionStatusAnalyticsEngine.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/status/analytics/TestCachingConnectionStatusAnalyticsEngine.java
@@ -17,19 +17,20 @@
 package org.apache.nifi.controller.status.analytics;
 
 import org.apache.nifi.controller.flow.FlowManager;
+import org.apache.nifi.controller.repository.FlowFileEventRepository;
 import org.apache.nifi.controller.status.history.ComponentStatusRepository;
 import org.junit.Test;
 
 public class TestCachingConnectionStatusAnalyticsEngine extends TestStatusAnalyticsEngine {
 
     @Override
-    public StatusAnalyticsEngine getStatusAnalyticsEngine(FlowManager flowManager, ComponentStatusRepository componentStatusRepository) {
-        return new CachingConnectionStatusAnalyticsEngine(flowManager, componentStatusRepository);
+    public StatusAnalyticsEngine getStatusAnalyticsEngine(FlowManager flowManager, FlowFileEventRepository flowFileEventRepository, ComponentStatusRepository componentStatusRepository) {
+        return new CachingConnectionStatusAnalyticsEngine(flowManager, componentStatusRepository, flowFileEventRepository);
     }
 
     @Test
     public void testCachedStatusAnalytics() {
-        StatusAnalyticsEngine statusAnalyticsEngine = new CachingConnectionStatusAnalyticsEngine(flowManager, statusRepository);
+        StatusAnalyticsEngine statusAnalyticsEngine = new CachingConnectionStatusAnalyticsEngine(flowManager, statusRepository, flowFileEventRepository);
         StatusAnalytics statusAnalyticsA = statusAnalyticsEngine.getStatusAnalytics("A");
         StatusAnalytics statusAnalyticsB = statusAnalyticsEngine.getStatusAnalytics("B");
         StatusAnalytics statusAnalyticsTest = statusAnalyticsEngine.getStatusAnalytics("A");
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/status/analytics/TestConnectionStatusAnalytics.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/status/analytics/TestConnectionStatusAnalytics.java
index 1f5d520..b013ed4 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/status/analytics/TestConnectionStatusAnalytics.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/status/analytics/TestConnectionStatusAnalytics.java
@@ -19,6 +19,7 @@ package org.apache.nifi.controller.status.analytics;
 import static org.junit.Assert.assertNotNull;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Mockito.when;
 
@@ -32,6 +33,9 @@ import java.util.stream.Collectors;
 import org.apache.nifi.connectable.Connection;
 import org.apache.nifi.controller.flow.FlowManager;
 import org.apache.nifi.controller.queue.FlowFileQueue;
+import org.apache.nifi.controller.repository.FlowFileEvent;
+import org.apache.nifi.controller.repository.FlowFileEventRepository;
+import org.apache.nifi.controller.repository.RepositoryStatusReport;
 import org.apache.nifi.controller.status.history.ComponentStatusRepository;
 import org.apache.nifi.controller.status.history.ConnectionStatusDescriptor;
 import org.apache.nifi.controller.status.history.MetricDescriptor;
@@ -57,6 +61,9 @@ public class TestConnectionStatusAnalytics {
         final StatusHistory statusHistory = Mockito.mock(StatusHistory.class);
         final Connection connection = Mockito.mock(Connection.class);
         final FlowFileQueue flowFileQueue = Mockito.mock(FlowFileQueue.class);
+        final FlowFileEventRepository flowFileEventRepository = Mockito.mock(FlowFileEventRepository.class);
+        final RepositoryStatusReport repositoryStatusReport = Mockito.mock(RepositoryStatusReport.class);
+        final FlowFileEvent flowFileEvent = Mockito.mock(FlowFileEvent.class);
         final List<Connection> connections = new ArrayList<>();
         final String connectionIdentifier = "1";
         connections.add(connection);
@@ -65,11 +72,20 @@ public class TestConnectionStatusAnalytics {
         final long startTime = System.currentTimeMillis();
         int iterations = 10;
 
+        Long inputBytes = queuedBytes * 2;
+        Long outputBytes = inputBytes - queuedBytes;
+        Long inputCount = queuedCount * 2;
+        Long outputCount = inputCount - queuedCount;
+
         for (int i = 0; i < iterations; i++) {
             final StandardStatusSnapshot snapshot = new StandardStatusSnapshot(CONNECTION_METRICS);
             snapshot.setTimestamp(new Date(startTime + i * 1000));
             snapshot.addStatusMetric(ConnectionStatusDescriptor.QUEUED_BYTES.getDescriptor(), (isConstantStatus || i < 5) ? queuedBytes : queuedBytes * 2);
             snapshot.addStatusMetric(ConnectionStatusDescriptor.QUEUED_COUNT.getDescriptor(), (isConstantStatus || i < 5) ? queuedCount : queuedCount * 2);
+            snapshot.addStatusMetric(ConnectionStatusDescriptor.INPUT_BYTES.getDescriptor(), (isConstantStatus || i < 5) ? inputBytes : inputBytes * 2);
+            snapshot.addStatusMetric(ConnectionStatusDescriptor.INPUT_COUNT.getDescriptor(), (isConstantStatus || i < 5) ? inputCount : inputCount * 2);
+            snapshot.addStatusMetric(ConnectionStatusDescriptor.OUTPUT_BYTES.getDescriptor(), (isConstantStatus || i < 5) ? outputBytes : outputBytes * 2);
+            snapshot.addStatusMetric(ConnectionStatusDescriptor.OUTPUT_COUNT.getDescriptor(), (isConstantStatus || i < 5) ? outputCount : outputCount * 2);
             snapshotList.add(snapshot);
         }
 
@@ -80,8 +96,14 @@ public class TestConnectionStatusAnalytics {
         when(processGroup.findAllConnections()).thenReturn(connections);
         when(statusHistory.getStatusSnapshots()).thenReturn(snapshotList);
         when(flowManager.getRootGroup()).thenReturn(processGroup);
+        when(flowFileEvent.getContentSizeIn()).thenReturn(inputBytes);
+        when(flowFileEvent.getContentSizeOut()).thenReturn(outputBytes);
+        when(flowFileEvent.getFlowFilesIn()).thenReturn(inputCount.intValue());
+        when(flowFileEvent.getFlowFilesOut()).thenReturn(outputCount.intValue());
+        when(flowFileEventRepository.reportTransferEvents(anyLong())).thenReturn(repositoryStatusReport);
+        when(repositoryStatusReport.getReportEntry(anyString())).thenReturn(flowFileEvent);
         when(statusRepository.getConnectionStatusHistory(anyString(), any(), any(), anyInt())).thenReturn(statusHistory);
-        ConnectionStatusAnalytics connectionStatusAnalytics = new ConnectionStatusAnalytics(statusRepository, flowManager, connectionIdentifier, false);
+        ConnectionStatusAnalytics connectionStatusAnalytics = new ConnectionStatusAnalytics(statusRepository, flowManager,flowFileEventRepository, connectionIdentifier, false);
         connectionStatusAnalytics.init();
         return connectionStatusAnalytics;
     }
@@ -91,7 +113,7 @@ public class TestConnectionStatusAnalytics {
         ConnectionStatusAnalytics connectionStatusAnalytics = getConnectionStatusAnalytics(5000L, 50L, "100MB", 100L, true);
         Long interval = connectionStatusAnalytics.getIntervalTimeMillis();
         assertNotNull(interval);
-        assert (interval == 300000);
+        assert (interval == 180000);
     }
 
     @Test
@@ -99,7 +121,7 @@ public class TestConnectionStatusAnalytics {
         ConnectionStatusAnalytics connectionStatusAnalytics = getConnectionStatusAnalytics(5000L, 50L, "100MB", 100L, true);
         Long countTime = connectionStatusAnalytics.getTimeToCountBackpressureMillis();
         assertNotNull(countTime);
-        assert (countTime == -1L);
+        assert (countTime > 0);
     }
 
     @Test
@@ -107,7 +129,7 @@ public class TestConnectionStatusAnalytics {
         ConnectionStatusAnalytics connectionStatusAnalytics = getConnectionStatusAnalytics(5000L, 50L, "100MB", 100L, false);
         Long countTime = connectionStatusAnalytics.getTimeToCountBackpressureMillis();
         assertNotNull(countTime);
-        assert (countTime > -1L);
+        assert (countTime == -1L);
     }
 
     @Test
@@ -115,7 +137,7 @@ public class TestConnectionStatusAnalytics {
         ConnectionStatusAnalytics connectionStatusAnalytics = getConnectionStatusAnalytics(5000L, 50L, "100MB", 100L, true);
         Long bytesTime = connectionStatusAnalytics.getTimeToBytesBackpressureMillis();
         assertNotNull(bytesTime);
-        assert (bytesTime == -1L);
+        assert (bytesTime == -1L || bytesTime == 0);
     }
 
     @Test
@@ -123,7 +145,7 @@ public class TestConnectionStatusAnalytics {
         ConnectionStatusAnalytics connectionStatusAnalytics = getConnectionStatusAnalytics(5000L, 50L, "100MB", 100L, false);
         Long bytesTime = connectionStatusAnalytics.getTimeToBytesBackpressureMillis();
         assertNotNull(bytesTime);
-        assert (bytesTime > -1L);
+        assert (bytesTime == -1L);
     }
 
     @Test
@@ -139,7 +161,7 @@ public class TestConnectionStatusAnalytics {
         ConnectionStatusAnalytics connectionStatusAnalytics = getConnectionStatusAnalytics(5000L, 50L, "100MB", 100L, false);
         Long nextBytes = connectionStatusAnalytics.getNextIntervalBytes();
         assertNotNull(nextBytes);
-        assert (nextBytes > -1L);
+        assert (nextBytes == -1L);
     }
 
     @Test
@@ -152,10 +174,11 @@ public class TestConnectionStatusAnalytics {
 
     @Test
     public void testGetNextIntervalCountVaryingStatus() {
-        ConnectionStatusAnalytics connectionStatusAnalytics = getConnectionStatusAnalytics(5000L, 50L, "100MB", 100L, true);
+
+        ConnectionStatusAnalytics connectionStatusAnalytics = getConnectionStatusAnalytics(5000L, 50L, "100MB", 100L, false);
         Long nextCount = connectionStatusAnalytics.getNextIntervalCount();
         assertNotNull(nextCount);
-        assert (nextCount == 50L);
+        assert (nextCount == -1L);
     }
 
     @Test
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/status/analytics/TestConnectionStatusAnalyticsEngine.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/status/analytics/TestConnectionStatusAnalyticsEngine.java
index 2005e04..99c9fae 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/status/analytics/TestConnectionStatusAnalyticsEngine.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/status/analytics/TestConnectionStatusAnalyticsEngine.java
@@ -17,13 +17,14 @@
 package org.apache.nifi.controller.status.analytics;
 
 import org.apache.nifi.controller.flow.FlowManager;
+import org.apache.nifi.controller.repository.FlowFileEventRepository;
 import org.apache.nifi.controller.status.history.ComponentStatusRepository;
 
 public class TestConnectionStatusAnalyticsEngine extends TestStatusAnalyticsEngine {
 
     @Override
-    public StatusAnalyticsEngine getStatusAnalyticsEngine(FlowManager flowManager, ComponentStatusRepository statusRepository) {
-        return new ConnectionStatusAnalyticsEngine(flowManager, statusRepository);
+    public StatusAnalyticsEngine getStatusAnalyticsEngine(FlowManager flowManager, FlowFileEventRepository flowFileEventRepository, ComponentStatusRepository statusRepository) {
+        return new ConnectionStatusAnalyticsEngine(flowManager, statusRepository, flowFileEventRepository);
     }
 
 }
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/status/analytics/TestStatusAnalyticsEngine.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/status/analytics/TestStatusAnalyticsEngine.java
index 183cd27..ee3d4e5 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/status/analytics/TestStatusAnalyticsEngine.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/status/analytics/TestStatusAnalyticsEngine.java
@@ -25,6 +25,7 @@ import static org.mockito.Mockito.when;
 import java.util.Collections;
 
 import org.apache.nifi.controller.flow.FlowManager;
+import org.apache.nifi.controller.repository.FlowFileEventRepository;
 import org.apache.nifi.controller.status.history.ComponentStatusRepository;
 import org.apache.nifi.controller.status.history.StatusHistory;
 import org.apache.nifi.controller.status.history.StatusSnapshot;
@@ -37,6 +38,7 @@ public abstract class TestStatusAnalyticsEngine {
 
     protected ComponentStatusRepository statusRepository;
     protected FlowManager flowManager;
+    protected FlowFileEventRepository flowFileEventRepository;
 
     @Before
     public void setup() {
@@ -53,11 +55,11 @@ public abstract class TestStatusAnalyticsEngine {
 
     @Test
     public void testGetStatusAnalytics() {
-        StatusAnalyticsEngine statusAnalyticsEngine = getStatusAnalyticsEngine(flowManager, statusRepository);
+        StatusAnalyticsEngine statusAnalyticsEngine = getStatusAnalyticsEngine(flowManager,flowFileEventRepository, statusRepository);
         StatusAnalytics statusAnalytics = statusAnalyticsEngine.getStatusAnalytics("1");
         assertNotNull(statusAnalytics);
     }
 
-    public abstract StatusAnalyticsEngine getStatusAnalyticsEngine(FlowManager flowManager, ComponentStatusRepository componentStatusRepository);
+    public abstract StatusAnalyticsEngine getStatusAnalyticsEngine(FlowManager flowManager, FlowFileEventRepository flowFileEventRepository, ComponentStatusRepository componentStatusRepository);
 
 }
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/status/analytics/models/TestOrdinaryLeastSqauresMSAM.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/status/analytics/models/TestOrdinaryLeastSqauresMSAM.java
new file mode 100644
index 0000000..d4775c2
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/status/analytics/models/TestOrdinaryLeastSqauresMSAM.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.status.analytics.models;
+
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Stream;
+
+import org.apache.commons.lang3.time.DateUtils;
+import org.apache.commons.math3.linear.SingularMatrixException;
+import org.apache.nifi.util.Tuple;
+import org.junit.Test;
+
+public class TestOrdinaryLeastSqauresMSAM {
+
+
+    @Test
+    public void testConstantPrediction(){
+
+        Double timestamp = 1565444720000.0;
+        Double inputCount = 1000.0;
+        Double outputCount = 1000.0;
+        Double queueCount = 50.0;
+
+        Double[] feature0 = {timestamp - 1000,outputCount/inputCount};
+        Double[] feature1 = {timestamp,outputCount/inputCount};
+        Double[] feature2 = {timestamp + 1000,outputCount/inputCount};
+        Double[] feature3 = {timestamp + 2000,outputCount/inputCount};
+
+        Double[][] features = {feature0, feature1,feature2,feature3};
+        Double[] labels = {queueCount,queueCount,queueCount, queueCount};
+
+        OrdinaryLeastSquaresMSAM model = new OrdinaryLeastSquaresMSAM();
+        boolean exOccurred = false;
+        try {
+            model.learn(Stream.of(features), Stream.of(labels));
+        } catch (SingularMatrixException sme){
+            exOccurred = true;
+        }
+        assertTrue(exOccurred);
+
+    }
+
+    @Test
+    public void testVaryingPredictionOfVariable(){
+
+        Double timestamp = 1565444720000.0;
+        Double inputCount = 1000.0;
+        Double outputCount = 50.0;
+        Double queueCount = 950.0;
+
+        Double[] feature0 = {timestamp,outputCount/inputCount};
+        Double[] feature1 = {timestamp + 1000,outputCount/(inputCount + 50)};
+        Double[] feature2 = {timestamp + 2000,(outputCount + 50)/(inputCount)};
+        Double[] feature3 = {timestamp + 3000,(outputCount + 100)/(inputCount - 100)};
+
+        Double[][] features = {feature0, feature1,feature2,feature3};
+        Double[] labels = {queueCount,queueCount + 50, queueCount - 50, queueCount - 100};
+
+        OrdinaryLeastSquaresMSAM model = new OrdinaryLeastSquaresMSAM();
+
+        model.learn(Stream.of(features), Stream.of(labels));
+
+        Tuple<Integer,Double> ratioPredictor = new Tuple<>(1,200/800.0);
+        List<Tuple<Integer,Double>> predictorVars = new ArrayList<>();
+        predictorVars.add(ratioPredictor);
+        Double target = model.predictVariable(0,predictorVars, 750.0);
+        Double rSquared = model.getRSquared();
+        assert(rSquared > .90);
+        Date targetDate = new Date(target.longValue());
+        Date testDate = new Date(timestamp.longValue());
+        assert(DateUtils.isSameDay(targetDate,testDate) && targetDate.after(testDate));
+
+    }
+
+    @Test
+    public void testVaryingPrediction(){
+
+        Double timestamp = 1565444720000.0;
+        Double inputCount = 1000.0;
+        Double outputCount = 50.0;
+        Double queueCount = 950.0;
+
+        Double[] feature0 = {timestamp,outputCount/inputCount};
+        Double[] feature1 = {timestamp + 1000,outputCount/(inputCount + 50)};
+        Double[] feature2 = {timestamp + 2000,(outputCount + 50)/(inputCount)};
+        Double[] feature3 = {timestamp + 3000,(outputCount + 100)/(inputCount - 100)};
+
+        Double[][] features = {feature0, feature1,feature2,feature3};
+        Double[] labels = {queueCount,queueCount + 50, queueCount - 50, queueCount - 100};
+
+
+        OrdinaryLeastSquaresMSAM model = new OrdinaryLeastSquaresMSAM();
+
+        Double[] predictor = {timestamp + 5000, outputCount/inputCount};
+
+        model.learn(Stream.of(features), Stream.of(labels));
+        Double target = model.predict(predictor);
+        Double rSquared = model.getRSquared();
+        assert(rSquared > .90);
+        assert(target >= 950);
+
+    }
+
+    @Test
+    public void comparePredictions(){
+
+        Double timestamp = 1565444720000.0;
+        Double inputCount = 1000.0;
+        Double outputCount = 50.0;
+        Double queueCount = 950.0;
+
+        Double[] feature0 = {timestamp,outputCount/inputCount};
+        Double[] feature1 = {timestamp + 1000,outputCount/(inputCount + 50)};
+        Double[] feature2 = {timestamp + 2000,(outputCount + 50)/(inputCount)};
+        Double[] feature3 = {timestamp + 3000,(outputCount + 100)/(inputCount - 100)};
+
+        Double[][] features = {feature0, feature1,feature2,feature3};
+        Double[] labels = {queueCount,queueCount + 50, queueCount - 50, queueCount - 100};
+
+        OrdinaryLeastSquaresMSAM ordinaryLeastSquaresMSAM = new OrdinaryLeastSquaresMSAM();
+        SimpleRegressionBSAM simpleRegressionBSAM = new SimpleRegressionBSAM(false);
+
+        ordinaryLeastSquaresMSAM.learn(Stream.of(features), Stream.of(labels));
+        simpleRegressionBSAM.learn(Stream.of(features), Stream.of(labels));
+        double olsR2 = ordinaryLeastSquaresMSAM.getRSquared();
+        double srR2 = simpleRegressionBSAM.getRSquared();
+        assert(!Double.isNaN(olsR2));
+        assert(!Double.isNaN(srR2));
+        Map<String,Double> olsScores = ordinaryLeastSquaresMSAM.getScores();
+        Map<String,Double> srScores = simpleRegressionBSAM.getScores();
+        System.out.print(olsScores.toString());
+        System.out.print(srScores.toString());
+        assert(olsR2 > srR2);
+
+    }
+
+
+
+
+}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/status/analytics/models/TestSimpleRegressionBSAM.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/status/analytics/models/TestSimpleRegressionBSAM.java
new file mode 100644
index 0000000..71da432
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/status/analytics/models/TestSimpleRegressionBSAM.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.status.analytics.models;
+
+import static org.junit.Assert.assertNotNull;
+
+import java.util.stream.Stream;
+import org.junit.Test;
+
+public class TestSimpleRegressionBSAM {
+
+    @Test
+    public void testConstantPrediction(){
+
+        Double timestamp = 1565444720000.0;
+        Double queueCount = 50.0;
+
+        Double[] feature0 = {timestamp - 1000};
+        Double[] feature1 = {timestamp};
+        Double[] feature2 = {timestamp + 1000};
+        Double[] feature3 = {timestamp + 2000};
+
+        Double[][] features = {feature0, feature1,feature2,feature3};
+        Double[] labels = {queueCount,queueCount,queueCount, queueCount};
+
+        SimpleRegressionBSAM model = new SimpleRegressionBSAM(false);
+
+        model.learn(Stream.of(features), Stream.of(labels));
+
+        Double[] predictor = {timestamp + 5000};
+        Double target = model.predict(predictor);
+        assertNotNull(target);
+        assert(target  == 50);
+
+    }
+
+    @Test
+    public void testVaryingPredictX(){
+
+        Double timestamp = 1565444720000.0;
+        Double queueCount = 950.0;
+
+        Double[] feature0 = {timestamp};
+        Double[] feature1 = {timestamp + 1000};
+        Double[] feature2 = {timestamp + 2000};
+        Double[] feature3 = {timestamp + 3000 };
+
+        Double[][] features = {feature0, feature1,feature2,feature3};
+        Double[] labels = {queueCount,queueCount + 50, queueCount - 50, queueCount - 100};
+
+        SimpleRegressionBSAM model = new SimpleRegressionBSAM(false);
+
+        model.learn(Stream.of(features), Stream.of(labels));
+
+        Double target = model.predictX(1000.0);
+        Double minTimeMillis = 1565343920000.0;
+        Double maxTimeMillis = 1565516720000.0;
+        assert(target >= minTimeMillis && target <= maxTimeMillis);
+
+    }
+
+    @Test
+    public void testVaryingPredictY(){
+
+        Double timestamp = 1565444720000.0;
+        Double queueCount = 950.0;
+
+        Double[] feature0 = {timestamp};
+        Double[] feature1 = {timestamp + 1000};
+        Double[] feature2 = {timestamp + 2000};
+        Double[] feature3 = {timestamp + 3000};
+
+        Double[][] features = {feature0, feature1,feature2,feature3};
+        Double[] labels = {queueCount,queueCount + 50, queueCount - 50, queueCount - 100};
+
+        SimpleRegressionBSAM model = new SimpleRegressionBSAM(false);
+
+        Double[] predictor = {timestamp + 5000};
+
+        model.learn(Stream.of(features), Stream.of(labels));
+        Double target = model.predict(predictor);
+        Double rSquared = model.getRSquared();
+        Double minCount = -1265.0;
+        Double maxCount = 3235.0;
+        assert(rSquared > .60);
+        assert(target >= minCount && target <= maxCount);
+    }
+}