You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ai...@apache.org on 2019/08/27 16:43:06 UTC

[nifi] 12/23: NIFI-6510 Analytics Framework Introduction (#10)

This is an automated email from the ASF dual-hosted git repository.

aichrist pushed a commit to branch analytics-framework
in repository https://gitbox.apache.org/repos/asf/nifi.git

commit e46dd3783a3e43dbbd139508776d6ca92b8cfc17
Author: Yolanda Davis <yo...@gmail.com>
AuthorDate: Mon Jul 29 07:16:21 2019 -0400

    NIFI-6510 Analytics Framework Introduction (#10)
    
    * DFA-9 - Initial refactor for Status Analytics - created additional interfaces for models, refactored callers to use StatusAnalytics objects with connection context. Implemented SimpleRegression model.
    
    DFA-9 - added logging
    
    * DFA-9 - relocated query window to CSA from model, adding the prediction percentages and time interval
    
    * DFA-9 - checkstyle fixes
---
 .../nifi/controller/status/ConnectionStatus.java   |  19 ++
 .../controller/status/analytics/QueryWindow.java   |  59 +++++
 .../status/analytics/StatusAnalytics.java          |  26 +--
 .../status/analytics/StatusAnalyticsEngine.java    |   3 +-
 ...tusAnalytics.java => StatusAnalyticsModel.java} |  17 +-
 .../status/ConnectionStatisticsSnapshotDTO.java    |  37 ++-
 .../analytics/BivariateStatusAnalyticsModel.java   |  30 ++-
 .../CachingConnectionStatusAnalyticsEngine.java    | 147 ++----------
 .../analytics/ConnectionStatusAnalytics.java       | 254 +++++++++++++++++++++
 .../analytics/ConnectionStatusAnalyticsEngine.java | 121 +---------
 .../status/analytics/SimpleRegressionBSAM.java     |  55 +++++
 .../apache/nifi/reporting/StandardEventAccess.java |  15 +-
 .../apache/nifi/web/StandardNiFiServiceFacade.java |  60 ++---
 .../org/apache/nifi/web/api/dto/DtoFactory.java    |  98 ++++----
 .../nifi/web/controller/ControllerFacade.java      |  59 ++---
 15 files changed, 595 insertions(+), 405 deletions(-)

diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/status/ConnectionStatus.java b/nifi-api/src/main/java/org/apache/nifi/controller/status/ConnectionStatus.java
index ee7dd45..783677a 100644
--- a/nifi-api/src/main/java/org/apache/nifi/controller/status/ConnectionStatus.java
+++ b/nifi-api/src/main/java/org/apache/nifi/controller/status/ConnectionStatus.java
@@ -45,6 +45,8 @@ public class ConnectionStatus implements Cloneable {
     private long nextPredictedQueuedBytes;
     private long predictedTimeToCountBackpressureMillis;
     private long predictedTimeToBytesBackpressureMillis;
+    private int predictedPercentCount = 0;
+    private int predictedPercentBytes = 0;
 
     public String getId() {
         return id;
@@ -231,6 +233,23 @@ public class ConnectionStatus implements Cloneable {
         this.predictedTimeToBytesBackpressureMillis = predictedTimeToBytesBackpressureMillis;
     }
 
+    public int getPredictedPercentCount() {
+        return predictedPercentCount;
+    }
+
+    public void setPredictedPercentCount(int predictedPercentCount) {
+        this.predictedPercentCount = predictedPercentCount;
+    }
+
+    public int getPredictedPercentBytes() {
+        return predictedPercentBytes;
+    }
+
+    public void setPredictedPercentBytes(int predictedPercentBytes) {
+        this.predictedPercentBytes = predictedPercentBytes;
+    }
+
+
     @Override
     public ConnectionStatus clone() {
         final ConnectionStatus clonedObj = new ConnectionStatus();
diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/status/analytics/QueryWindow.java b/nifi-api/src/main/java/org/apache/nifi/controller/status/analytics/QueryWindow.java
new file mode 100644
index 0000000..c477872
--- /dev/null
+++ b/nifi-api/src/main/java/org/apache/nifi/controller/status/analytics/QueryWindow.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.status.analytics;
+
+import java.util.Date;
+
+public class QueryWindow {
+
+    private long startTimeMillis;
+    private long endTimeMillis;
+
+    public QueryWindow(long startTimeMillis, long endTimeMillis) {
+        this.startTimeMillis = startTimeMillis;
+        this.endTimeMillis = endTimeMillis;
+    }
+
+    public long getStartTimeMillis() {
+        return startTimeMillis;
+    }
+
+    public void setStartTimeMillis(long startTimeMillis) {
+        this.startTimeMillis = startTimeMillis;
+    }
+
+    public long getEndTimeMillis() {
+        return endTimeMillis;
+    }
+
+    public void setEndTimeMillis(long endTimeMillis) {
+        this.endTimeMillis = endTimeMillis;
+    }
+
+    public Date getStartDateTime() {
+        return new Date(startTimeMillis);
+    }
+
+    public Date getEndDateTime() {
+        return new Date(endTimeMillis);
+    }
+
+    public long getTimeDifferenceMillis(){
+        return endTimeMillis - startTimeMillis;
+    }
+
+}
diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalytics.java b/nifi-api/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalytics.java
index 564f1c9..a65629f 100644
--- a/nifi-api/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalytics.java
+++ b/nifi-api/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalytics.java
@@ -16,32 +16,16 @@
  */
 package org.apache.nifi.controller.status.analytics;
 
+import java.util.Map;
+
 /**
  * The StatusAnalytics interface offers methods for accessing predicted and other values for a single component (Connection instance, e.g.)
  */
 public interface StatusAnalytics {
 
-    /**
-     * Returns the predicted time (in milliseconds) when backpressure is expected to be applied to this connection, based on the total number of bytes in the queue.
-     * @return milliseconds until backpressure is predicted to occur, based on the total number of bytes in the queue.
-     */
-    long getTimeToBytesBackpressureMillis();
-
-    /**
-     * Returns the predicted time (in milliseconds) when backpressure is expected to be applied to this connection, based on the number of objects in the queue.
-     * @return milliseconds until backpressure is predicted to occur, based on the number of objects in the queue.
-     */
-    long getTimeToCountBackpressureMillis();
+    QueryWindow getQueryWindow();
+    Map<String,Long> getPredictions();
+    boolean supportsOnlineLearning();
 
-    /**
-     * Returns the predicted total number of bytes in the queue to occur at the next configured interval (5 mins in the future, e.g.).
-     * @return milliseconds until backpressure is predicted to occur, based on the total number of bytes in the queue.
-     */
-    long getNextIntervalBytes();
 
-    /**
-     * Returns the predicted number of objects in the queue to occur at the next configured interval (5 mins in the future, e.g.).
-     * @return milliseconds until backpressure is predicted to occur, based on the number of bytes in the queue.
-     */
-    int getNextIntervalCount();
 }
diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalyticsEngine.java b/nifi-api/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalyticsEngine.java
index 5cbc333..01021c2 100644
--- a/nifi-api/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalyticsEngine.java
+++ b/nifi-api/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalyticsEngine.java
@@ -18,5 +18,6 @@ package org.apache.nifi.controller.status.analytics;
 
 public interface StatusAnalyticsEngine {
 
-    ConnectionStatusAnalytics getConnectionStatusAnalytics(String connectionId);
+    StatusAnalytics getStatusAnalytics(String componentId);
+
 }
diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/status/analytics/ConnectionStatusAnalytics.java b/nifi-api/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalyticsModel.java
similarity index 66%
copy from nifi-api/src/main/java/org/apache/nifi/controller/status/analytics/ConnectionStatusAnalytics.java
copy to nifi-api/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalyticsModel.java
index 9792ae4..72c81b1 100644
--- a/nifi-api/src/main/java/org/apache/nifi/controller/status/analytics/ConnectionStatusAnalytics.java
+++ b/nifi-api/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalyticsModel.java
@@ -16,17 +16,12 @@
  */
 package org.apache.nifi.controller.status.analytics;
 
+import java.util.stream.Stream;
 
-/**
- * The ConnectionStatusAnalytics interface offers additional methods to the StatusAnalytics interface related to the supporting connection information (group ID, e.g.)
- */
-public interface ConnectionStatusAnalytics extends StatusAnalytics{
+public interface StatusAnalyticsModel {
+
+    void learn(Stream<Double> features, Stream<Double> labels);
+    Double predict(Double feature);
+    Boolean supportsOnlineLearning();
 
-    String getGroupId();
-    String getId();
-    String getName();
-    String getSourceId();
-    String getSourceName();
-    String getDestinationId();
-    String getDestinationName();
 }
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/ConnectionStatisticsSnapshotDTO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/ConnectionStatisticsSnapshotDTO.java
index 526bdcf..a521db4 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/ConnectionStatisticsSnapshotDTO.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/ConnectionStatisticsSnapshotDTO.java
@@ -16,10 +16,10 @@
  */
 package org.apache.nifi.web.api.dto.status;
 
-import io.swagger.annotations.ApiModelProperty;
-
 import javax.xml.bind.annotation.XmlType;
 
+import io.swagger.annotations.ApiModelProperty;
+
 /**
  * DTO for serializing the statistics of a connection.
  */
@@ -39,6 +39,9 @@ public class ConnectionStatisticsSnapshotDTO implements Cloneable {
     private Long predictedMillisUntilBytesBackpressure = 0L;
     private Integer predictedCountAtNextInterval = 0;
     private Long predictedBytesAtNextInterval = 0L;
+    private Integer predictedPercentCount = 0;
+    private Integer predictedPercentBytes = 0;
+    private Long predictionIntervalMillis = 0L;
 
     /* getters / setters */
     /**
@@ -161,6 +164,33 @@ public class ConnectionStatisticsSnapshotDTO implements Cloneable {
         this.predictedBytesAtNextInterval = predictedBytesAtNextInterval;
     }
 
+    @ApiModelProperty("The predicted percentage of queued objects at the next configured interval.")
+    public Integer getPredictedPercentCount() {
+        return predictedPercentCount;
+    }
+
+    public void setPredictedPercentCount(Integer predictedPercentCount) {
+        this.predictedPercentCount = predictedPercentCount;
+    }
+
+    @ApiModelProperty("The predicted percentage of bytes in the queue against current threshold at the next configured interval.")
+    public Integer getPredictedPercentBytes() {
+        return predictedPercentBytes;
+    }
+
+    public void setPredictedPercentBytes(Integer predictedPercentBytes) {
+        this.predictedPercentBytes = predictedPercentBytes;
+    }
+
+    @ApiModelProperty("The prediction interval in seconds")
+    public Long getPredictionIntervalMillis() {
+        return predictionIntervalMillis;
+    }
+
+    public void setPredictionIntervalMillis(Long predictionIntervalMillis) {
+        this.predictionIntervalMillis = predictionIntervalMillis;
+    }
+
     @Override
     public ConnectionStatisticsSnapshotDTO clone() {
         final ConnectionStatisticsSnapshotDTO other = new ConnectionStatisticsSnapshotDTO();
@@ -176,6 +206,9 @@ public class ConnectionStatisticsSnapshotDTO implements Cloneable {
         other.setPredictedMillisUntilBytesBackpressure(getPredictedMillisUntilBytesBackpressure());
         other.setPredictedCountAtNextInterval(getPredictedCountAtNextInterval());
         other.setPredictedBytesAtNextInterval(getPredictedBytesAtNextInterval());
+        other.setPredictedPercentCount(getPredictedPercentCount());
+        other.setPredictedPercentBytes(getPredictedPercentBytes());
+        other.setPredictionIntervalMillis(getPredictionIntervalMillis());
 
         return other;
     }
diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/status/analytics/ConnectionStatusAnalytics.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/BivariateStatusAnalyticsModel.java
similarity index 66%
rename from nifi-api/src/main/java/org/apache/nifi/controller/status/analytics/ConnectionStatusAnalytics.java
rename to nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/BivariateStatusAnalyticsModel.java
index 9792ae4..eff661b 100644
--- a/nifi-api/src/main/java/org/apache/nifi/controller/status/analytics/ConnectionStatusAnalytics.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/BivariateStatusAnalyticsModel.java
@@ -16,17 +16,23 @@
  */
 package org.apache.nifi.controller.status.analytics;
 
+import java.util.stream.Stream;
+
+public abstract class BivariateStatusAnalyticsModel implements StatusAnalyticsModel {
+
+
+    public abstract void learn(Stream<Double> features, Stream<Double> labels);
+
+    public abstract Double predict(Double feature);
+
+    public abstract Double predictX(Double y);
+
+    public abstract Double predictY(Double x);
+
+    @Override
+    public Boolean supportsOnlineLearning() {
+        return false;
+    }
+
 
-/**
- * The ConnectionStatusAnalytics interface offers additional methods to the StatusAnalytics interface related to the supporting connection information (group ID, e.g.)
- */
-public interface ConnectionStatusAnalytics extends StatusAnalytics{
-
-    String getGroupId();
-    String getId();
-    String getName();
-    String getSourceId();
-    String getSourceName();
-    String getDestinationId();
-    String getDestinationName();
 }
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/CachingConnectionStatusAnalyticsEngine.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/CachingConnectionStatusAnalyticsEngine.java
index c12dbae..c4836c6 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/CachingConnectionStatusAnalyticsEngine.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/CachingConnectionStatusAnalyticsEngine.java
@@ -16,19 +16,10 @@
  */
 package org.apache.nifi.controller.status.analytics;
 
-import java.util.Date;
-import java.util.List;
 import java.util.concurrent.TimeUnit;
 
-import org.apache.commons.math3.stat.regression.SimpleRegression;
-import org.apache.nifi.connectable.Connection;
 import org.apache.nifi.controller.FlowController;
 import org.apache.nifi.controller.status.history.ComponentStatusRepository;
-import org.apache.nifi.controller.status.history.ConnectionStatusDescriptor;
-import org.apache.nifi.controller.status.history.StatusHistoryUtil;
-import org.apache.nifi.groups.ProcessGroup;
-import org.apache.nifi.web.api.dto.status.StatusHistoryDTO;
-import org.apache.nifi.web.api.dto.status.StatusSnapshotDTO;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -38,141 +29,33 @@ import com.github.benmanes.caffeine.cache.Caffeine;
 public class CachingConnectionStatusAnalyticsEngine implements StatusAnalyticsEngine {
     private ComponentStatusRepository statusRepository;
     private FlowController controller;
-    private volatile Cache<String, SimpleRegression> cache;
-    private static final Logger LOG = LoggerFactory.getLogger(ConnectionStatusAnalyticsEngine.class);
+    private volatile Cache<String, ConnectionStatusAnalytics> cache;
+    private static final Logger LOG = LoggerFactory.getLogger(CachingConnectionStatusAnalyticsEngine.class);
 
     public CachingConnectionStatusAnalyticsEngine(FlowController controller, ComponentStatusRepository statusRepository) {
         this.controller = controller;
         this.statusRepository = statusRepository;
         this.cache = Caffeine.newBuilder()
-                .expireAfterWrite(1, TimeUnit.MINUTES)
+                .expireAfterWrite(5, TimeUnit.MINUTES)
                 .build();
     }
 
     @Override
-    public ConnectionStatusAnalytics getConnectionStatusAnalytics(String connectionId) {
-
-        ProcessGroup rootGroup = controller.getFlowManager().getRootGroup();
-        Connection connection = rootGroup.findConnection(connectionId);
-        SimpleRegression cachedRegression = cache.getIfPresent(connection.getIdentifier());
-
-        if (cachedRegression == null) {
-            cachedRegression = getBackPressureRegressionModel(connection);
-            if (cachedRegression != null)
-                cache.put(connection.getIdentifier(), cachedRegression);
+    public StatusAnalytics getStatusAnalytics(String identifier) {
+
+        ConnectionStatusAnalytics connectionStatusAnalytics = cache.getIfPresent(identifier);
+        if(connectionStatusAnalytics == null){
+            LOG.info("Creating new analytics for connection id: {0}", identifier);
+            connectionStatusAnalytics = new ConnectionStatusAnalytics(statusRepository,controller,identifier);
+            connectionStatusAnalytics.init();
+            cache.put(identifier,connectionStatusAnalytics);
+        }else{
+            LOG.info("Pulled existing analytics from cache for connection id: {}", identifier);
+            connectionStatusAnalytics.refresh();
         }
-
-        ConnectionStatusAnalytics cachedResult = calculate(cachedRegression, connection);
-        LOG.info("Connection: " + connectionId + " Cached backpressure Time: " + cachedResult.getTimeToCountBackpressureMillis());
-        return cachedResult;
-    }
-
-    protected ConnectionStatusAnalytics calculate(SimpleRegression regression, Connection conn) {
-        long backPressureObjectThreshold = conn.getFlowFileQueue().getBackPressureObjectThreshold();
-
-        final long connTimeToBackpressure;
-
-        if (regression == null) {
-            connTimeToBackpressure = Long.MAX_VALUE;
-        } else {
-            //If calculation returns as negative only 0 will return
-            connTimeToBackpressure = Math.max(0, Math.round((backPressureObjectThreshold - regression.getIntercept()) / regression.getSlope())
-                    - System.currentTimeMillis());
-        }
-
-        return new ConnectionStatusAnalytics() {
-
-            @Override
-            public String getSourceName() {
-                return conn.getSource().getName();
-            }
-
-            @Override
-            public String getSourceId() {
-                return conn.getSource().getIdentifier();
-            }
-
-            @Override
-            public String getName() {
-                return conn.getName();
-            }
-
-
-            @Override
-            public String getId() {
-                return conn.getIdentifier();
-            }
-
-            @Override
-            public long getTimeToBytesBackpressureMillis() {
-                return 0;
-            }
-
-            @Override
-            public long getTimeToCountBackpressureMillis() {
-                return connTimeToBackpressure;
-            }
-
-            @Override
-            public long getNextIntervalBytes() {
-                return 0;
-            }
-
-            @Override
-            public int getNextIntervalCount() {
-                return 0;
-            }
-
-            @Override
-            public String getGroupId() {
-                return conn.getProcessGroup().getIdentifier();
-            }
-
-            @Override
-            public String getDestinationName() {
-                return conn.getDestination().getName();
-            }
-
-            @Override
-            public String getDestinationId() {
-                return conn.getDestination().getIdentifier();
-            }
-        };
+        return connectionStatusAnalytics;
 
     }
 
-    /**
-     * Get backpressure model based on current data
-     *
-     * @param conn the connection to run the analytic on
-     * @return
-     */
-    protected SimpleRegression getBackPressureRegressionModel(Connection conn) {
-        Date minDate = new Date(System.currentTimeMillis() - (60 * 1000));
-        StatusHistoryDTO connHistory = StatusHistoryUtil.createStatusHistoryDTO(
-                statusRepository.getConnectionStatusHistory(conn.getIdentifier(), minDate, null, Integer.MAX_VALUE));
-        List<StatusSnapshotDTO> aggregateSnapshots = connHistory.getAggregateSnapshots();
-
-        if (aggregateSnapshots.size() < 2) {
-            LOG.info("Not enough data to model time to backpressure.");
-            return null;
-        } else {
-
-            ConnectionStatusDescriptor.QUEUED_COUNT.getField();
-            SimpleRegression regression = new SimpleRegression();
 
-            for (StatusSnapshotDTO snap : aggregateSnapshots) {
-                Long snapQueuedCount = snap.getStatusMetrics().get(ConnectionStatusDescriptor.QUEUED_COUNT.getField());
-                long snapTime = snap.getTimestamp().getTime();
-                regression.addData(snapTime, snapQueuedCount);
-            }
-
-            if (regression.getSlope() <= 0 && !conn.getFlowFileQueue().isFull()) {
-                LOG.info("Connection " + conn.getIdentifier() + " is not experiencing backpressure.");
-                return null;
-            } else {
-                return regression;
-            }
-        }
-    }
 }
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/ConnectionStatusAnalytics.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/ConnectionStatusAnalytics.java
new file mode 100644
index 0000000..bad2ff1
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/ConnectionStatusAnalytics.java
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.status.analytics;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Optional;
+import java.util.stream.Stream;
+
+import org.apache.nifi.connectable.Connection;
+import org.apache.nifi.controller.FlowController;
+import org.apache.nifi.controller.status.history.ComponentStatusRepository;
+import org.apache.nifi.controller.status.history.ConnectionStatusDescriptor;
+import org.apache.nifi.controller.status.history.StatusHistory;
+import org.apache.nifi.controller.status.history.StatusHistoryUtil;
+import org.apache.nifi.groups.ProcessGroup;
+import org.apache.nifi.processor.DataUnit;
+import org.apache.nifi.util.Tuple;
+import org.apache.nifi.web.api.dto.status.StatusHistoryDTO;
+import org.apache.nifi.web.api.dto.status.StatusSnapshotDTO;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ConnectionStatusAnalytics implements StatusAnalytics {
+
+    private static final Logger LOG = LoggerFactory.getLogger(ConnectionStatusAnalytics.class);
+    private Map<String, Tuple<StatusAnalyticsModel, ExtractFunction>> modelMap;
+    private QueryWindow queryWindow;
+    private final ComponentStatusRepository componentStatusRepository;
+    private final String connectionIdentifier;
+    private final FlowController flowController;
+
+    public ConnectionStatusAnalytics(ComponentStatusRepository componentStatusRepository, FlowController flowController, String connectionIdentifier) {
+        this.componentStatusRepository = componentStatusRepository;
+        this.flowController = flowController;
+        this.connectionIdentifier = connectionIdentifier;
+    }
+
+    public void init() {
+
+        if (this.modelMap == null || this.modelMap.isEmpty()) {
+            Tuple<StatusAnalyticsModel, ExtractFunction> countModelFunction = new Tuple<>(new SimpleRegressionBSAM(), extract);
+            Tuple<StatusAnalyticsModel, ExtractFunction> byteModelFunction = new Tuple<>(new SimpleRegressionBSAM(), extract);
+            this.modelMap = new HashMap<>();
+            //TODO: Should change keys used here
+            this.modelMap.put(ConnectionStatusDescriptor.QUEUED_COUNT.getField(), countModelFunction);
+            this.modelMap.put(ConnectionStatusDescriptor.QUEUED_BYTES.getField(), byteModelFunction);
+            this.queryWindow = new QueryWindow(System.currentTimeMillis() - (5 * 60 * 1000), System.currentTimeMillis());
+        }
+
+        refresh();
+    }
+
+    public void refresh() {
+
+        modelMap.forEach((metric, modelFunction) -> {
+
+            StatusAnalyticsModel model = modelFunction.getKey();
+            ExtractFunction extract = modelFunction.getValue();
+            StatusHistory statusHistory = componentStatusRepository.getConnectionStatusHistory(connectionIdentifier, queryWindow.getStartDateTime(), queryWindow.getEndDateTime(), Integer.MAX_VALUE);
+            Tuple<Stream<Double>, Stream<Double>> modelData = extract.extractMetric(metric, statusHistory);
+            LOG.info("Refreshing model for connection id: {} ", connectionIdentifier);
+            Stream<Double> times = modelData.getKey();
+            Stream<Double> counts = modelData.getValue();
+            //times is the X axis and counts is on the y axis
+            model.learn(times, counts);
+
+        });
+    }
+
+    /**
+     * Returns the predicted time (in milliseconds) when backpressure is expected to be applied to this connection, based on the total number of bytes in the queue.
+     *
+     * @return milliseconds until backpressure is predicted to occur, based on the total number of bytes in the queue.
+     */
+    public long getTimeToBytesBackpressureMillis() {
+
+        final BivariateStatusAnalyticsModel bytesModel = (BivariateStatusAnalyticsModel) modelMap.get(ConnectionStatusDescriptor.QUEUED_BYTES.getField()).getKey();
+        final Connection connection = getConnection();
+        if (connection == null) {
+            throw new NoSuchElementException("Connection with the following id cannot be found:" + connectionIdentifier + ". Model should be invalidated!");
+        }
+        final String backPressureDataSize = connection.getFlowFileQueue().getBackPressureDataSizeThreshold();
+        final double backPressureBytes = DataUnit.parseDataSize(backPressureDataSize, DataUnit.B);
+        final double prediction = bytesModel.predictX(backPressureBytes);
+        if (prediction != Double.NaN) {
+            return Math.max(0, Math.round(prediction) - System.currentTimeMillis());
+        } else {
+            return Long.MAX_VALUE;
+        }
+
+    }
+
+    /**
+     * Returns the predicted time (in milliseconds) when backpressure is expected to be applied to this connection, based on the number of objects in the queue.
+     *
+     * @return milliseconds until backpressure is predicted to occur, based on the number of objects in the queue.
+     */
+    public long getTimeToCountBackpressureMillis() {
+
+        final BivariateStatusAnalyticsModel countModel = (BivariateStatusAnalyticsModel) modelMap.get(ConnectionStatusDescriptor.QUEUED_COUNT.getField()).getKey();
+        final Connection connection = getConnection();
+        if (connection == null) {
+            throw new NoSuchElementException("Connection with the following id cannot be found:" + connectionIdentifier + ". Model should be invalidated!");
+        }
+        final double backPressureCountThreshold = connection.getFlowFileQueue().getBackPressureObjectThreshold();
+        final Double prediction = countModel.predictX(backPressureCountThreshold);
+
+        if (prediction != Double.NaN) {
+            return Math.max(0, Math.round(prediction) - System.currentTimeMillis());
+        } else {
+            return Long.MAX_VALUE;
+        }
+    }
+
+    /**
+     * Returns the predicted total number of bytes in the queue to occur at the next configured interval (5 mins in the future, e.g.).
+     *
+     * @return milliseconds until backpressure is predicted to occur, based on the total number of bytes in the queue.
+     */
+
+    public long getNextIntervalBytes() {
+        final BivariateStatusAnalyticsModel bytesModel = (BivariateStatusAnalyticsModel) modelMap.get(ConnectionStatusDescriptor.QUEUED_BYTES.getField()).getKey();
+        final Double prediction = bytesModel.predictY((double) System.currentTimeMillis() + getIntervalTimeMillis());
+        if (prediction != Double.NaN) {
+            return Math.round(prediction);
+        } else {
+            return 0;
+        }
+    }
+
+    /**
+     * Returns the predicted number of objects in the queue to occur at the next configured interval (5 mins in the future, e.g.).
+     *
+     * @return milliseconds until backpressure is predicted to occur, based on the number of bytes in the queue.
+     */
+
+    public int getNextIntervalCount() {
+        final BivariateStatusAnalyticsModel countModel = (BivariateStatusAnalyticsModel) modelMap.get(ConnectionStatusDescriptor.QUEUED_COUNT.getField()).getKey();
+        final Double prediction = countModel.predictY((double) System.currentTimeMillis() + getIntervalTimeMillis());
+        if (prediction != Double.NaN) {
+            return ((Long) Math.round(prediction)).intValue();
+        } else {
+            return 0;
+        }
+    }
+
+    public int getNextIntervalPercentageUseCount(){
+
+        final Connection connection = getConnection();
+        if (connection == null) {
+            throw new NoSuchElementException("Connection with the following id cannot be found:" + connectionIdentifier + ". Model should be invalidated!");
+        }
+        final double backPressureCountThreshold = connection.getFlowFileQueue().getBackPressureObjectThreshold();
+
+        return ((Long)Math.round((getNextIntervalCount()/backPressureCountThreshold) * 100)).intValue();
+
+    }
+
+    public int getNextIntervalPercentageUseBytes(){
+
+        final Connection connection = getConnection();
+        if (connection == null) {
+            throw new NoSuchElementException("Connection with the following id cannot be found:" + connectionIdentifier + ". Model should be invalidated!");
+        }
+        final String backPressureDataSize = connection.getFlowFileQueue().getBackPressureDataSizeThreshold();
+        final double backPressureBytes = DataUnit.parseDataSize(backPressureDataSize, DataUnit.B);
+
+        return ((Long)Math.round((getNextIntervalBytes()/ backPressureBytes) * 100)).intValue();
+
+    }
+
+    public long getIntervalTimeMillis(){
+        return getQueryWindow().getTimeDifferenceMillis();
+    }
+
+    @Override
+    public QueryWindow getQueryWindow() {
+        return queryWindow;
+    }
+
+    /**
+     * Returns all available predictions
+     */
+    @Override
+    public Map<String, Long> getPredictions() {
+
+        Map<String, Long> predictions = new HashMap<>();
+        predictions.put("timeToBytesBackpressureMillis", getTimeToBytesBackpressureMillis());
+        predictions.put("timeToCountBackpressureMillis", getTimeToCountBackpressureMillis());
+        predictions.put("nextIntervalBytes", getNextIntervalBytes());
+        predictions.put("nextIntervalCount", (long) getNextIntervalCount());
+        predictions.put("nextIntervalPercentageUseCount", (long)getNextIntervalPercentageUseCount());
+        predictions.put("nextIntervalPercentageUseBytes", (long)getNextIntervalPercentageUseBytes());
+        predictions.put("intervalTimeMillis", getIntervalTimeMillis());
+
+        predictions.forEach((key,value) -> {
+            LOG.info("Prediction model for connection id {}: {}={} ", connectionIdentifier,key,value);
+        });
+
+        return predictions;
+    }
+
+    @Override
+    public boolean supportsOnlineLearning() {
+        return true;
+    }
+
+    private Connection getConnection() {
+        final ProcessGroup rootGroup = flowController.getFlowManager().getRootGroup();
+        Optional<Connection> connection = rootGroup.findAllConnections().stream().filter(c -> c.getIdentifier().equals(this.connectionIdentifier)).findFirst();
+        return connection.orElse(null);
+    }
+
+    private interface ExtractFunction {
+        Tuple<Stream<Double>, Stream<Double>> extractMetric(String metric, StatusHistory statusHistory);
+    }
+
+    private final ExtractFunction extract = (metric, statusHistory) -> {
+
+        List<Double> counts = new ArrayList<>();
+        List<Double> times = new ArrayList<>();
+
+        StatusHistoryDTO statusHistoryDTO = StatusHistoryUtil.createStatusHistoryDTO(statusHistory);
+
+        for (StatusSnapshotDTO snap : statusHistoryDTO.getAggregateSnapshots()) {
+            Long snapValue = snap.getStatusMetrics().get(metric);
+            long snapTime = snap.getTimestamp().getTime();
+            counts.add((double) snapValue);
+            times.add((double) snapTime);
+        }
+        return new Tuple<>(times.stream(), counts.stream());
+
+    };
+
+
+}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/ConnectionStatusAnalyticsEngine.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/ConnectionStatusAnalyticsEngine.java
index 11862c8..7f9db25 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/ConnectionStatusAnalyticsEngine.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/ConnectionStatusAnalyticsEngine.java
@@ -16,18 +16,8 @@
  */
 package org.apache.nifi.controller.status.analytics;
 
-import java.util.Date;
-import java.util.List;
-
-import org.apache.commons.math3.stat.regression.SimpleRegression;
-import org.apache.nifi.connectable.Connection;
 import org.apache.nifi.controller.FlowController;
 import org.apache.nifi.controller.status.history.ComponentStatusRepository;
-import org.apache.nifi.controller.status.history.ConnectionStatusDescriptor;
-import org.apache.nifi.controller.status.history.StatusHistoryUtil;
-import org.apache.nifi.groups.ProcessGroup;
-import org.apache.nifi.web.api.dto.status.StatusHistoryDTO;
-import org.apache.nifi.web.api.dto.status.StatusSnapshotDTO;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -43,114 +33,11 @@ public class ConnectionStatusAnalyticsEngine implements StatusAnalyticsEngine {
     }
 
     @Override
-    public ConnectionStatusAnalytics getConnectionStatusAnalytics(String connectionId) {
-        ProcessGroup rootGroup = controller.getFlowManager().getRootGroup();
-        return getConnectionStatusAnalytics(rootGroup.findConnection(connectionId));
+    public StatusAnalytics getStatusAnalytics(String identifier) {
+        ConnectionStatusAnalytics connectionStatusAnalytics = new ConnectionStatusAnalytics(statusRepository,controller,identifier);
+        connectionStatusAnalytics.init();
+        return connectionStatusAnalytics;
     }
 
-    /**
-     * Finds the number of millis until the given connection will experience backpressure.
-     * @param conn the connection to run the analytic on
-     * @return
-     */
-    public ConnectionStatusAnalytics getConnectionStatusAnalytics(Connection conn) {
-        LOG.debug("Getting connection history for: " + conn.getIdentifier());
-        long connTimeToBackpressure;
-        Date minDate = new Date(System.currentTimeMillis() - (5 * 60 * 1000));
-        StatusHistoryDTO connHistory = StatusHistoryUtil.createStatusHistoryDTO(
-                statusRepository.getConnectionStatusHistory(conn.getIdentifier(), minDate, null, Integer.MAX_VALUE));
-        List<StatusSnapshotDTO> aggregateSnapshots = connHistory.getAggregateSnapshots();
-
-        if (aggregateSnapshots.size() < 2) {
-            LOG.info("Not enough data to model time to backpressure.");
-            connTimeToBackpressure = Long.MAX_VALUE;
-        } else {
-
-            long backPressureObjectThreshold = conn.getFlowFileQueue().getBackPressureObjectThreshold();
-            LOG.info("Connection " + conn.getIdentifier() + " backpressure object threshold is " + backPressureObjectThreshold);
-
-            ConnectionStatusDescriptor.QUEUED_COUNT.getField();
-
-            SimpleRegression regression = new SimpleRegression();
-
-            for (StatusSnapshotDTO snap : aggregateSnapshots) {
-                Long snapQueuedCount = snap.getStatusMetrics().get(ConnectionStatusDescriptor.QUEUED_COUNT.getField());
-                long snapTime = snap.getTimestamp().getTime();
-                regression.addData(snapTime, snapQueuedCount);
-            }
-
-            // Skip this connection if its queue is declining.
-            if (regression.getSlope() <= 0) {
-                LOG.info("Connection " + conn.getIdentifier() + " is not experiencing backpressure.");
-                connTimeToBackpressure = Long.MAX_VALUE;
-            } else {
-
-                // Compute time-to backpressure for this connection; Reduce total result iff
-                // this connection is lower.
-                connTimeToBackpressure = Math
-                        .round((backPressureObjectThreshold - regression.getIntercept()) / regression.getSlope())
-                        - System.currentTimeMillis();
-                LOG.info("Connection " + conn.getIdentifier() + " time to backpressure is " + connTimeToBackpressure);
-            }
-        }
-
-        return new ConnectionStatusAnalytics() {
-
-            @Override
-            public String getSourceName() {
-                return conn.getSource().getName();
-            }
 
-            @Override
-            public String getSourceId() {
-                return conn.getSource().getIdentifier();
-            }
-
-            @Override
-            public String getName() {
-                return conn.getName();
-            }
-
-            @Override
-            public long getTimeToCountBackpressureMillis() {
-                return connTimeToBackpressure;
-            }
-
-            // TODO - populate the other prediction fields
-            @Override
-            public long getTimeToBytesBackpressureMillis() {
-                return 0;
-            }
-
-            @Override
-            public long getNextIntervalBytes() {
-                return 0;
-            }
-
-            @Override
-            public int getNextIntervalCount() {
-                return 0;
-            }
-
-            @Override
-            public String getId() {
-                return conn.getIdentifier();
-            }
-
-            @Override
-            public String getGroupId() {
-                return conn.getProcessGroup().getIdentifier();
-            }
-
-            @Override
-            public String getDestinationName() {
-                return conn.getDestination().getName();
-            }
-
-            @Override
-            public String getDestinationId() {
-                return conn.getDestination().getIdentifier();
-            }
-        };
-    }
 }
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/SimpleRegressionBSAM.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/SimpleRegressionBSAM.java
new file mode 100644
index 0000000..8aa4a45
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/SimpleRegressionBSAM.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.status.analytics;
+
+
+import java.util.stream.Stream;
+
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.commons.math3.stat.regression.SimpleRegression;
+
+public class SimpleRegressionBSAM extends BivariateStatusAnalyticsModel {
+
+    private SimpleRegression regression;
+
+    public SimpleRegressionBSAM() {
+        this.regression = new SimpleRegression();
+    }
+
+    @Override
+    public void learn(Stream<Double> features, Stream<Double> labels) {
+        double[] labelArray = ArrayUtils.toPrimitive(labels.toArray(Double[]::new));
+        double[][] featuresMatrix = features.map(feature -> new double[]{feature}).toArray(double[][]::new);
+        regression.clear();
+        regression.addObservations(featuresMatrix, labelArray);
+    }
+
+    @Override
+    public Double predict(Double feature) {
+        return predictY(feature);
+    }
+
+    @Override
+    public Double predictX(Double y) {
+        return (y - regression.getIntercept()) / regression.getSlope();
+    }
+
+    @Override
+    public Double predictY(Double x) {
+        return regression.getSlope() * x + regression.getIntercept();
+    }
+}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/reporting/StandardEventAccess.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/reporting/StandardEventAccess.java
index 33e650b..1e36975 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/reporting/StandardEventAccess.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/reporting/StandardEventAccess.java
@@ -20,6 +20,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
@@ -343,12 +344,16 @@ public class StandardEventAccess implements UserAwareEventAccess {
             }
 
             if (statusAnalyticsEngine != null) {
-                StatusAnalytics statusAnalytics = statusAnalyticsEngine.getConnectionStatusAnalytics(conn.getIdentifier());
+                StatusAnalytics statusAnalytics =  statusAnalyticsEngine.getStatusAnalytics(conn.getIdentifier());
                 if (statusAnalytics != null) {
-                    connStatus.setPredictedTimeToBytesBackpressureMillis(statusAnalytics.getTimeToBytesBackpressureMillis());
-                    connStatus.setPredictedTimeToCountBackpressureMillis(statusAnalytics.getTimeToCountBackpressureMillis());
-                    connStatus.setNextPredictedQueuedBytes(statusAnalytics.getNextIntervalBytes());
-                    connStatus.setNextPredictedQueuedCount(statusAnalytics.getNextIntervalCount());
+                    Map<String,Long> predictions = statusAnalytics.getPredictions();
+                    connStatus.setPredictedTimeToBytesBackpressureMillis(predictions.get("timeToBytesBackpressureMillis"));
+                    connStatus.setPredictedTimeToCountBackpressureMillis(predictions.get("timeToCountBackpressureMillis"));
+                    connStatus.setNextPredictedQueuedBytes(predictions.get("nextIntervalBytes"));
+                    connStatus.setNextPredictedQueuedCount(predictions.get("nextIntervalCount").intValue());
+                    connStatus.setPredictedPercentCount(predictions.get("nextIntervalPercentageUseCount").intValue());
+                    connStatus.setPredictedPercentBytes(predictions.get("nextIntervalPercentageUseBytes").intValue());
+                    connStatus.setPredictionIntervalMillis(predictions.get("intervalTimeMillis"));
                 }
             }
 
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
index f2e7608..3614b9d 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
@@ -16,7 +16,35 @@
  */
 package org.apache.nifi.web;
 
-import com.google.common.collect.Sets;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+import java.util.function.Predicate;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.Response;
+
 import org.apache.commons.collections4.CollectionUtils;
 import org.apache.nifi.action.Action;
 import org.apache.nifi.action.Component;
@@ -309,33 +337,7 @@ import org.apache.nifi.web.util.SnippetUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.ws.rs.WebApplicationException;
-import javax.ws.rs.core.Response;
-import java.io.IOException;
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.ListIterator;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Optional;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-import java.util.function.Function;
-import java.util.function.Predicate;
-import java.util.function.Supplier;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
+import com.google.common.collect.Sets;
 
 /**
  * Implementation of NiFiServiceFacade that performs revision checking.
@@ -3197,7 +3199,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
     public ConnectionStatisticsEntity getConnectionStatistics(final String connectionId) {
         final Connection connection = connectionDAO.getConnection(connectionId);
         final PermissionsDTO permissions = dtoFactory.createPermissionsDto(connection);
-        final ConnectionStatisticsDTO dto = dtoFactory.createConnectionStatisticsDto(controllerFacade.getConnectionStatistics(connectionId));
+        final ConnectionStatisticsDTO dto = dtoFactory.createConnectionStatisticsDto(connection, controllerFacade.getConnectionStatusAnalytics(connectionId));
         return entityFactory.createConnectionStatisticsEntity(dto, permissions);
     }
 
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
index 88cc299..e56e99a 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
@@ -16,6 +16,34 @@
  */
 package org.apache.nifi.web.api.dto;
 
+import java.text.Collator;
+import java.text.NumberFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.TimeZone;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+import javax.ws.rs.WebApplicationException;
+
 import org.apache.commons.codec.digest.DigestUtils;
 import org.apache.commons.lang3.ClassUtils;
 import org.apache.commons.lang3.StringUtils;
@@ -105,7 +133,7 @@ import org.apache.nifi.controller.status.PortStatus;
 import org.apache.nifi.controller.status.ProcessGroupStatus;
 import org.apache.nifi.controller.status.ProcessorStatus;
 import org.apache.nifi.controller.status.RemoteProcessGroupStatus;
-import org.apache.nifi.controller.status.analytics.ConnectionStatusAnalytics;
+import org.apache.nifi.controller.status.analytics.StatusAnalytics;
 import org.apache.nifi.controller.status.history.GarbageCollectionHistory;
 import org.apache.nifi.controller.status.history.GarbageCollectionStatus;
 import org.apache.nifi.diagnostics.GarbageCollection;
@@ -234,33 +262,6 @@ import org.apache.nifi.web.api.entity.VariableEntity;
 import org.apache.nifi.web.controller.ControllerFacade;
 import org.apache.nifi.web.revision.RevisionManager;
 
-import javax.ws.rs.WebApplicationException;
-import java.text.Collator;
-import java.text.NumberFormat;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Locale;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.TimeZone;
-import java.util.TreeMap;
-import java.util.TreeSet;
-import java.util.concurrent.TimeUnit;
-import java.util.function.Function;
-import java.util.function.Supplier;
-import java.util.stream.Collectors;
-
 public final class DtoFactory {
 
     @SuppressWarnings("rawtypes")
@@ -1197,30 +1198,35 @@ public final class DtoFactory {
         return connectionStatusDto;
     }
 
-    public ConnectionStatisticsDTO createConnectionStatisticsDto(final ConnectionStatusAnalytics connectionStatistics) {
+    public ConnectionStatisticsDTO createConnectionStatisticsDto(final Connection connection, final StatusAnalytics statusAnalytics) {
         final ConnectionStatisticsDTO connectionStatisticsDTO = new ConnectionStatisticsDTO();
-        connectionStatisticsDTO.setGroupId(connectionStatistics.getGroupId());
-        connectionStatisticsDTO.setId(connectionStatistics.getId());
-        connectionStatisticsDTO.setName(connectionStatistics.getName());
-        connectionStatisticsDTO.setSourceId(connectionStatistics.getSourceId());
-        connectionStatisticsDTO.setSourceName(connectionStatistics.getSourceName());
-        connectionStatisticsDTO.setDestinationId(connectionStatistics.getDestinationId());
-        connectionStatisticsDTO.setDestinationName(connectionStatistics.getDestinationName());
+
+        connectionStatisticsDTO.setGroupId(connection.getProcessGroup().getIdentifier());
+        connectionStatisticsDTO.setId(connection.getIdentifier());
+        connectionStatisticsDTO.setName(connection.getName());
+        connectionStatisticsDTO.setSourceId(connection.getSource().getIdentifier());
+        connectionStatisticsDTO.setSourceName(connection.getSource().getName());
+        connectionStatisticsDTO.setDestinationId(connection.getDestination().getIdentifier());
+        connectionStatisticsDTO.setDestinationName(connection.getDestination().getName());
         connectionStatisticsDTO.setStatsLastRefreshed(new Date());
 
         final ConnectionStatisticsSnapshotDTO snapshot = new ConnectionStatisticsSnapshotDTO();
         connectionStatisticsDTO.setAggregateSnapshot(snapshot);
 
-        snapshot.setId(connectionStatistics.getId());
-        snapshot.setGroupId(connectionStatistics.getGroupId());
-        snapshot.setName(connectionStatistics.getName());
-        snapshot.setSourceName(connectionStatistics.getSourceName());
-        snapshot.setDestinationName(connectionStatistics.getDestinationName());
-
-        snapshot.setPredictedMillisUntilBytesBackpressure(connectionStatistics.getTimeToBytesBackpressureMillis());
-        snapshot.setPredictedMillisUntilCountBackpressure(connectionStatistics.getTimeToCountBackpressureMillis());
-        snapshot.setPredictedBytesAtNextInterval(connectionStatistics.getNextIntervalBytes());
-        snapshot.setPredictedCountAtNextInterval(connectionStatistics.getNextIntervalCount());
+        snapshot.setId(connection.getIdentifier());
+        snapshot.setGroupId(connection.getProcessGroup().getIdentifier());
+        snapshot.setName(connection.getName());
+        snapshot.setSourceName(connection.getSource().getName());
+        snapshot.setDestinationName(connection.getDestination().getName());
+
+        Map<String,Long> predictions = statusAnalytics.getPredictions();
+        snapshot.setPredictedMillisUntilBytesBackpressure(predictions.get("timeToBytesBackpressureMillis"));
+        snapshot.setPredictedMillisUntilCountBackpressure(predictions.get("timeToCountBackpressureMillis"));
+        snapshot.setPredictedBytesAtNextInterval(predictions.get("nextIntervalBytes"));
+        snapshot.setPredictedCountAtNextInterval(predictions.get("nextIntervalCount").intValue());
+        snapshot.setPredictedPercentBytes(predictions.get("nextIntervalPercentageUseBytes").intValue());
+        snapshot.setPredictedPercentCount(predictions.get("nextIntervalPercentageUseCount").intValue());
+        snapshot.setPredictionIntervalMillis(predictions.get("intervalTimeMillis"));
 
         return connectionStatisticsDTO;
     }
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java
index 2fcef27..48abda9 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java
@@ -16,6 +16,30 @@
  */
 package org.apache.nifi.web.controller;
 
+import java.io.IOException;
+import java.io.InputStream;
+import java.text.Collator;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TimeZone;
+import java.util.TreeSet;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import javax.ws.rs.WebApplicationException;
+
 import org.apache.commons.collections4.CollectionUtils;
 import org.apache.commons.lang3.ClassUtils;
 import org.apache.commons.lang3.StringUtils;
@@ -56,7 +80,7 @@ import org.apache.nifi.controller.status.PortStatus;
 import org.apache.nifi.controller.status.ProcessGroupStatus;
 import org.apache.nifi.controller.status.ProcessorStatus;
 import org.apache.nifi.controller.status.RemoteProcessGroupStatus;
-import org.apache.nifi.controller.status.analytics.ConnectionStatusAnalytics;
+import org.apache.nifi.controller.status.analytics.StatusAnalytics;
 import org.apache.nifi.controller.status.analytics.StatusAnalyticsEngine;
 import org.apache.nifi.controller.status.history.ComponentStatusRepository;
 import org.apache.nifi.diagnostics.SystemDiagnostics;
@@ -112,29 +136,6 @@ import org.apache.nifi.web.api.entity.ControllerServiceEntity;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.ws.rs.WebApplicationException;
-import java.io.IOException;
-import java.io.InputStream;
-import java.text.Collator;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Comparator;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Locale;
-import java.util.Map;
-import java.util.Set;
-import java.util.SortedSet;
-import java.util.TimeZone;
-import java.util.TreeSet;
-import java.util.concurrent.TimeUnit;
-import java.util.function.Consumer;
-import java.util.function.Function;
-import java.util.stream.Collectors;
-
 public class ControllerFacade implements Authorizable {
 
     private static final Logger logger = LoggerFactory.getLogger(ControllerFacade.class);
@@ -683,12 +684,12 @@ public class ControllerFacade implements Authorizable {
     }
 
     /**
-     * Gets analytical statistics for the specified connection.
+     * Gets status analytics for the specified connection.
      *
      * @param connectionId connection id
      * @return the statistics for the specified connection
      */
-    public ConnectionStatusAnalytics getConnectionStatistics(final String connectionId) {
+    public StatusAnalytics getConnectionStatusAnalytics(final String connectionId) {
         final ProcessGroup root = getRootGroup();
         final Connection connection = root.findConnection(connectionId);
 
@@ -705,12 +706,12 @@ public class ControllerFacade implements Authorizable {
         }
 
         // get from flow controller
-        final StatusAnalyticsEngine status = flowController.getStatusAnalyticsEngine();
-        if (status == null) {
+        final StatusAnalyticsEngine statusAnalyticsEngine = flowController.getStatusAnalyticsEngine();
+        if (statusAnalyticsEngine == null) {
             throw new ResourceNotFoundException(String.format("Unable to locate connection with id '%s'.", connectionId));
         }
 
-        return status.getConnectionStatusAnalytics(connectionId);
+        return statusAnalyticsEngine.getStatusAnalytics(connectionId);
     }
 
     /**