You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ai...@apache.org on 2019/08/28 20:20:59 UTC

[nifi] 06/24: NIFI-6510 Updated objects and interfaces to reflect 4 prediction metrics

This is an automated email from the ASF dual-hosted git repository.

aichrist pushed a commit to branch analytics-framework
in repository https://gitbox.apache.org/repos/asf/nifi.git

commit cb5825022dfddb268a78efc35431f73c6afd3cab
Author: Matthew Burgess <ma...@apache.org>
AuthorDate: Mon Jul 22 18:11:58 2019 -0400

    NIFI-6510 Updated objects and interfaces to reflect 4 prediction metrics
    
    (cherry picked from commit 050e0fc)
    
    (cherry picked from commit 9fd365f)
---
 .../nifi/controller/status/ConnectionStatus.java   | 48 ++++++++++++++
 .../analytics/ConnectionStatusAnalytics.java       | 26 +++++++-
 .../status/analytics/StatusAnalytics.java          | 30 ++++++++-
 .../status/ConnectionStatisticsSnapshotDTO.java    | 47 ++++++++++++--
 .../dto/status/ConnectionStatusSnapshotDTO.java    | 44 +++++++++++++
 .../entity/ConnectionStatisticsSnapshotEntity.java |  1 -
 .../org/apache/nifi/controller/FlowController.java | 75 +++++++++-------------
 .../status/analytics/StatusAnalyticEngine.java     | 42 +++++++++++-
 .../apache/nifi/reporting/StandardEventAccess.java |  9 +++
 .../org/apache/nifi/web/api/dto/DtoFactory.java    |  9 ++-
 10 files changed, 274 insertions(+), 57 deletions(-)

diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/status/ConnectionStatus.java b/nifi-api/src/main/java/org/apache/nifi/controller/status/ConnectionStatus.java
index 341fda2..0e5d306 100644
--- a/nifi-api/src/main/java/org/apache/nifi/controller/status/ConnectionStatus.java
+++ b/nifi-api/src/main/java/org/apache/nifi/controller/status/ConnectionStatus.java
@@ -40,6 +40,10 @@ public class ConnectionStatus implements Cloneable {
     private long outputBytes;
     private int maxQueuedCount;
     private long maxQueuedBytes;
+    private int nextPredictedQueuedCount;
+    private long nextPredictedQueuedBytes;
+    private long predictedTimeToCountBackpressureMillis;
+    private long predictedTimeToBytesBackpressureMillis;
 
     public String getId() {
         return id;
@@ -186,6 +190,38 @@ public class ConnectionStatus implements Cloneable {
         this.backPressureBytesThreshold = backPressureBytesThreshold;
     }
 
+    public int getNextPredictedQueuedCount() {
+        return nextPredictedQueuedCount;
+    }
+
+    public void setNextPredictedQueuedCount(int nextPredictedQueuedCount) {
+        this.nextPredictedQueuedCount = nextPredictedQueuedCount;
+    }
+
+    public long getNextPredictedQueuedBytes() {
+        return nextPredictedQueuedBytes;
+    }
+
+    public void setNextPredictedQueuedBytes(long nextPredictedQueuedBytes) {
+        this.nextPredictedQueuedBytes = nextPredictedQueuedBytes;
+    }
+
+    public long getPredictedTimeToCountBackpressureMillis() {
+        return predictedTimeToCountBackpressureMillis;
+    }
+
+    public void setPredictedTimeToCountBackpressureMillis(long predictedTimeToCountBackpressureMillis) {
+        this.predictedTimeToCountBackpressureMillis = predictedTimeToCountBackpressureMillis;
+    }
+
+    public long getPredictedTimeToBytesBackpressureMillis() {
+        return predictedTimeToBytesBackpressureMillis;
+    }
+
+    public void setPredictedTimeToBytesBackpressureMillis(long predictedTimeToBytesBackpressureMillis) {
+        this.predictedTimeToBytesBackpressureMillis = predictedTimeToBytesBackpressureMillis;
+    }
+
     @Override
     public ConnectionStatus clone() {
         final ConnectionStatus clonedObj = new ConnectionStatus();
@@ -206,6 +242,10 @@ public class ConnectionStatus implements Cloneable {
         clonedObj.backPressureObjectThreshold = backPressureObjectThreshold;
         clonedObj.maxQueuedBytes = maxQueuedBytes;
         clonedObj.maxQueuedCount = maxQueuedCount;
+        clonedObj.nextPredictedQueuedBytes = nextPredictedQueuedBytes;
+        clonedObj.nextPredictedQueuedCount = nextPredictedQueuedCount;
+        clonedObj.predictedTimeToBytesBackpressureMillis = predictedTimeToBytesBackpressureMillis;
+        clonedObj.predictedTimeToCountBackpressureMillis = predictedTimeToCountBackpressureMillis;
         return clonedObj;
     }
 
@@ -246,6 +286,14 @@ public class ConnectionStatus implements Cloneable {
         builder.append(maxQueuedCount);
         builder.append(", maxQueueBytes=");
         builder.append(maxQueuedBytes);
+        builder.append(", nextPredictedQueuedBytes=");
+        builder.append(nextPredictedQueuedBytes);
+        builder.append(", nextPredictedQueuedCount=");
+        builder.append(nextPredictedQueuedCount);
+        builder.append(", predictedTimeToBytesBackpressureMillis=");
+        builder.append(predictedTimeToBytesBackpressureMillis);
+        builder.append(", predictedTimeToCountBackpressureMillis=");
+        builder.append(predictedTimeToCountBackpressureMillis);
         builder.append("]");
         return builder.toString();
     }
diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/status/analytics/ConnectionStatusAnalytics.java b/nifi-api/src/main/java/org/apache/nifi/controller/status/analytics/ConnectionStatusAnalytics.java
index 12c8a15..2380d55 100644
--- a/nifi-api/src/main/java/org/apache/nifi/controller/status/analytics/ConnectionStatusAnalytics.java
+++ b/nifi-api/src/main/java/org/apache/nifi/controller/status/analytics/ConnectionStatusAnalytics.java
@@ -17,7 +17,31 @@
 package org.apache.nifi.controller.status.analytics;
 
 public interface ConnectionStatusAnalytics {
-    long getMinTimeToBackpressureMillis();
+
+    /**
+     * Returns the predicted time (in milliseconds) when backpressure is expected to be applied to this connection, based on the total number of bytes in the queue.
+     * @return milliseconds until backpressure is predicted to occur, based on the total number of bytes in the queue.
+     */
+    long getTimeToBytesBackpressureMillis();
+
+    /**
+     * Returns the predicted time (in milliseconds) when backpressure is expected to be applied to this connection, based on the number of objects in the queue.
+     * @return milliseconds until backpressure is predicted to occur, based on the number of objects in the queue.
+     */
+    long getTimeToCountBackpressureMillis();
+
+    /**
+     * Returns the predicted total number of bytes in the queue to occur at the next configured interval (5 mins in the future, e.g.).
+     * @return milliseconds until backpressure is predicted to occur, based on the total number of bytes in the queue.
+     */
+    long getNextIntervalBytes();
+
+    /**
+     * Returns the predicted number of objects in the queue to occur at the next configured interval (5 mins in the future, e.g.).
+     * @return milliseconds until backpressure is predicted to occur, based on the number of bytes in the queue.
+     */
+    int getNextIntervalCount();
+
     String getGroupId();
     String getId();
     String getName();
diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalytics.java b/nifi-api/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalytics.java
index 42c2abd..7d29314 100644
--- a/nifi-api/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalytics.java
+++ b/nifi-api/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalytics.java
@@ -21,7 +21,35 @@ package org.apache.nifi.controller.status.analytics;
  */
 public interface StatusAnalytics {
 
+    /**
+     * Returns a ConnectionStatusAnalytics object containing all relevant metrics and analytical & statistical objects, as well as identity information for the connection.
+     *
+     * @param connectionId The unique ID of the connection
+     * @return A ConnectionStatusAnalytics object
+     */
     ConnectionStatusAnalytics getConnectionStatusAnalytics(String connectionId);
 
-    public long getMinTimeToBackpressureMillis();
+    /**
+     * Returns the predicted time (in milliseconds) when backpressure is expected to be applied to this connection, based on the total number of bytes in the queue.
+     * @return milliseconds until backpressure is predicted to occur, based on the total number of bytes in the queue.
+     */
+    long getTimeToBytesBackpressureMillis();
+
+    /**
+     * Returns the predicted time (in milliseconds) when backpressure is expected to be applied to this connection, based on the number of objects in the queue.
+     * @return milliseconds until backpressure is predicted to occur, based on the number of objects in the queue.
+     */
+    long getTimeToCountBackpressureMillis();
+
+    /**
+     * Returns the predicted total number of bytes in the queue to occur at the next configured interval (5 mins in the future, e.g.).
+     * @return milliseconds until backpressure is predicted to occur, based on the total number of bytes in the queue.
+     */
+    long getNextIntervalBytes();
+
+    /**
+     * Returns the predicted number of objects in the queue to occur at the next configured interval (5 mins in the future, e.g.).
+     * @return milliseconds until backpressure is predicted to occur, based on the number of bytes in the queue.
+     */
+    int getNextIntervalCount();
 }
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/ConnectionStatisticsSnapshotDTO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/ConnectionStatisticsSnapshotDTO.java
index e914f74..526bdcf 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/ConnectionStatisticsSnapshotDTO.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/ConnectionStatisticsSnapshotDTO.java
@@ -35,7 +35,10 @@ public class ConnectionStatisticsSnapshotDTO implements Cloneable {
     private String destinationId;
     private String destinationName;
 
-    private Long predictedMillisUntilBackpressure = 0L;
+    private Long predictedMillisUntilCountBackpressure = 0L;
+    private Long predictedMillisUntilBytesBackpressure = 0L;
+    private Integer predictedCountAtNextInterval = 0;
+    private Long predictedBytesAtNextInterval = 0L;
 
     /* getters / setters */
     /**
@@ -122,13 +125,40 @@ public class ConnectionStatisticsSnapshotDTO implements Cloneable {
         this.destinationName = destinationName;
     }
 
-    @ApiModelProperty("The predicted number of milliseconds before the connection will have backpressure applied.")
-    public Long getPredictedMillisUntilBackpressure() {
-        return predictedMillisUntilBackpressure;
+    @ApiModelProperty("The predicted number of milliseconds before the connection will have backpressure applied, based on the queued count.")
+    public Long getPredictedMillisUntilCountBackpressure() {
+        return predictedMillisUntilCountBackpressure;
     }
 
-    public void setPredictedMillisUntilBackpressure(Long predictedMillisUntilBackpressure) {
-        this.predictedMillisUntilBackpressure = predictedMillisUntilBackpressure;
+    public void setPredictedMillisUntilCountBackpressure(Long predictedMillisUntilCountBackpressure) {
+        this.predictedMillisUntilCountBackpressure = predictedMillisUntilCountBackpressure;
+    }
+
+    @ApiModelProperty("The predicted number of milliseconds before the connection will have backpressure applied, based on the total number of bytes in the queue.")
+    public Long getPredictedMillisUntilBytesBackpressure() {
+        return predictedMillisUntilBytesBackpressure;
+    }
+
+    public void setPredictedMillisUntilBytesBackpressure(Long predictedMillisUntilBytesBackpressure) {
+        this.predictedMillisUntilBytesBackpressure = predictedMillisUntilBytesBackpressure;
+    }
+
+    @ApiModelProperty("The predicted number of queued objects at the next configured interval.")
+    public Integer getPredictedCountAtNextInterval() {
+        return predictedCountAtNextInterval;
+    }
+
+    public void setPredictedCountAtNextInterval(Integer predictedCountAtNextInterval) {
+        this.predictedCountAtNextInterval = predictedCountAtNextInterval;
+    }
+
+    @ApiModelProperty("The predicted total number of bytes in the queue at the next configured interval.")
+    public Long getPredictedBytesAtNextInterval() {
+        return predictedBytesAtNextInterval;
+    }
+
+    public void setPredictedBytesAtNextInterval(Long predictedBytesAtNextInterval) {
+        this.predictedBytesAtNextInterval = predictedBytesAtNextInterval;
     }
 
     @Override
@@ -142,7 +172,10 @@ public class ConnectionStatisticsSnapshotDTO implements Cloneable {
         other.setSourceId(getSourceId());
         other.setSourceName(getSourceName());
 
-        other.setPredictedMillisUntilBackpressure(getPredictedMillisUntilBackpressure());
+        other.setPredictedMillisUntilCountBackpressure(getPredictedMillisUntilCountBackpressure());
+        other.setPredictedMillisUntilBytesBackpressure(getPredictedMillisUntilBytesBackpressure());
+        other.setPredictedCountAtNextInterval(getPredictedCountAtNextInterval());
+        other.setPredictedBytesAtNextInterval(getPredictedBytesAtNextInterval());
 
         return other;
     }
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/ConnectionStatusSnapshotDTO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/ConnectionStatusSnapshotDTO.java
index 7ba93cc..3237385 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/ConnectionStatusSnapshotDTO.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/ConnectionStatusSnapshotDTO.java
@@ -48,6 +48,10 @@ public class ConnectionStatusSnapshotDTO implements Cloneable {
     private String queuedCount;
     private Integer percentUseCount;
     private Integer percentUseBytes;
+    private Long predictedMillisUntilCountBackpressure = 0L;
+    private Long predictedMillisUntilBytesBackpressure = 0L;
+    private Integer predictedCountAtNextInterval = 0;
+    private Long predictedBytesAtNextInterval = 0L;
 
     /* getters / setters */
     /**
@@ -271,6 +275,42 @@ public class ConnectionStatusSnapshotDTO implements Cloneable {
         this.percentUseBytes = percentUseBytes;
     }
 
+    @ApiModelProperty("The predicted number of milliseconds before the connection will have backpressure applied, based on the queued count.")
+    public Long getPredictedMillisUntilCountBackpressure() {
+        return predictedMillisUntilCountBackpressure;
+    }
+
+    public void setPredictedMillisUntilCountBackpressure(Long predictedMillisUntilCountBackpressure) {
+        this.predictedMillisUntilCountBackpressure = predictedMillisUntilCountBackpressure;
+    }
+
+    @ApiModelProperty("The predicted number of milliseconds before the connection will have backpressure applied, based on the total number of bytes in the queue.")
+    public Long getPredictedMillisUntilBytesBackpressure() {
+        return predictedMillisUntilBytesBackpressure;
+    }
+
+    public void setPredictedMillisUntilBytesBackpressure(Long predictedMillisUntilBytesBackpressure) {
+        this.predictedMillisUntilBytesBackpressure = predictedMillisUntilBytesBackpressure;
+    }
+
+    @ApiModelProperty("The predicted number of queued objects at the next configured interval.")
+    public Integer getPredictedCountAtNextInterval() {
+        return predictedCountAtNextInterval;
+    }
+
+    public void setPredictedCountAtNextInterval(Integer predictedCountAtNextInterval) {
+        this.predictedCountAtNextInterval = predictedCountAtNextInterval;
+    }
+
+    @ApiModelProperty("The predicted total number of bytes in the queue at the next configured interval.")
+    public Long getPredictedBytesAtNextInterval() {
+        return predictedBytesAtNextInterval;
+    }
+
+    public void setPredictedBytesAtNextInterval(Long predictedBytesAtNextInterval) {
+        this.predictedBytesAtNextInterval = predictedBytesAtNextInterval;
+    }
+
     @Override
     public ConnectionStatusSnapshotDTO clone() {
         final ConnectionStatusSnapshotDTO other = new ConnectionStatusSnapshotDTO();
@@ -295,6 +335,10 @@ public class ConnectionStatusSnapshotDTO implements Cloneable {
         other.setQueuedSize(getQueuedSize());
         other.setPercentUseBytes(getPercentUseBytes());
         other.setPercentUseCount(getPercentUseCount());
+        other.setPredictedMillisUntilCountBackpressure(getPredictedMillisUntilCountBackpressure());
+        other.setPredictedMillisUntilBytesBackpressure(getPredictedMillisUntilBytesBackpressure());
+        other.setPredictedCountAtNextInterval(getPredictedCountAtNextInterval());
+        other.setPredictedBytesAtNextInterval(getPredictedBytesAtNextInterval());
 
         return other;
     }
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ConnectionStatisticsSnapshotEntity.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ConnectionStatisticsSnapshotEntity.java
index da7e5ca..6f4eee5 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ConnectionStatisticsSnapshotEntity.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ConnectionStatisticsSnapshotEntity.java
@@ -19,7 +19,6 @@ package org.apache.nifi.web.api.entity;
 import io.swagger.annotations.ApiModelProperty;
 import org.apache.nifi.web.api.dto.ReadablePermission;
 import org.apache.nifi.web.api.dto.status.ConnectionStatisticsSnapshotDTO;
-import org.apache.nifi.web.api.dto.status.ConnectionStatusSnapshotDTO;
 
 /**
  * A serialized representation of this class can be placed in the entity body of a request or response to or from the API.
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index 462b113..5f67b49 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -16,39 +16,6 @@
  */
 package org.apache.nifi.controller;
 
-import static java.util.Objects.requireNonNull;
-
-import java.io.ByteArrayInputStream;
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.lang.management.GarbageCollectorMXBean;
-import java.lang.management.ManagementFactory;
-import java.net.InetSocketAddress;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.stream.Collectors;
-
-import javax.net.ssl.SSLContext;
-
 import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.admin.service.AuditService;
 import org.apache.nifi.annotation.lifecycle.OnConfigurationRestored;
@@ -217,6 +184,38 @@ import org.apache.zookeeper.server.quorum.QuorumPeerConfig.ConfigException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.net.ssl.SSLContext;
+import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.lang.management.GarbageCollectorMXBean;
+import java.lang.management.ManagementFactory;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
+
+import static java.util.Objects.requireNonNull;
+
 public class FlowController implements ReportingTaskProvider, Authorizable, NodeTypeProvider {
 
     // default repository implementations
@@ -608,16 +607,6 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node
 
         analyticsEngine = new CachingStatusAnalyticEngine(this, componentStatusRepository);
 
-        timerDrivenEngineRef.get().scheduleWithFixedDelay(new Runnable() {
-            @Override
-            public void run() {
-                try {
-                    analyticsEngine.refreshModel();
-                } catch (final Exception e) {
-                    LOG.error("Failed to refresh model:", e);
-                }
-            }
-        }, 1, 1, TimeUnit.MINUTES); //FIXME use a real/configured interval (or maybe just compute on the fly when requested)
 
         this.connectionStatus = new NodeConnectionStatus(nodeId, DisconnectionCode.NOT_YET_CONNECTED);
         heartbeatBeanRef.set(new HeartbeatBean(rootGroup, false));
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalyticEngine.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalyticEngine.java
index 64c2065..024c138 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalyticEngine.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalyticEngine.java
@@ -113,10 +113,26 @@ public class StatusAnalyticEngine implements StatusAnalytics {
             }
 
             @Override
-            public long getMinTimeToBackpressureMillis() {
+            public long getTimeToCountBackpressureMillis() {
                 return connTimeToBackpressure;
             }
 
+            // TODO - populate the other prediction fields
+            @Override
+            public long getTimeToBytesBackpressureMillis() {
+                return 0;
+            }
+
+            @Override
+            public long getNextIntervalBytes() {
+                return 0;
+            }
+
+            @Override
+            public int getNextIntervalCount() {
+                return 0;
+            }
+
             @Override
             public String getId() {
                 return conn.getIdentifier();
@@ -139,7 +155,6 @@ public class StatusAnalyticEngine implements StatusAnalytics {
         };
     }
 
-    @Override
     public long getMinTimeToBackpressureMillis() {
         ProcessGroup rootGroup = controller.getFlowManager().getRootGroup();
         List<Connection> allConnections = rootGroup.findAllConnections();
@@ -148,10 +163,31 @@ public class StatusAnalyticEngine implements StatusAnalytics {
 
         for (Connection conn : allConnections) {
             ConnectionStatusAnalytics connAnalytics = getConnectionStatusAnalytics(conn);
-            minTimeToBackpressure = Math.min(minTimeToBackpressure, connAnalytics.getMinTimeToBackpressureMillis());
+            minTimeToBackpressure = Math.min(minTimeToBackpressure, connAnalytics.getTimeToCountBackpressureMillis());
         }
 
         LOG.info("Min time to backpressure is: " + Long.toString(minTimeToBackpressure));
         return minTimeToBackpressure;
     }
+
+    // TODO - populate the prediction fields. Do we need to pass in connection ID?
+    @Override
+    public long getTimeToCountBackpressureMillis() {
+        return 0;
+    }
+
+    @Override
+    public long getTimeToBytesBackpressureMillis() {
+        return 0;
+    }
+
+    @Override
+    public long getNextIntervalBytes() {
+        return 0;
+    }
+
+    @Override
+    public int getNextIntervalCount() {
+        return 0;
+    }
 }
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/reporting/StandardEventAccess.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/reporting/StandardEventAccess.java
index 0b4b73c..f943856 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/reporting/StandardEventAccess.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/reporting/StandardEventAccess.java
@@ -51,6 +51,7 @@ import org.apache.nifi.controller.status.ProcessorStatus;
 import org.apache.nifi.controller.status.RemoteProcessGroupStatus;
 import org.apache.nifi.controller.status.RunStatus;
 import org.apache.nifi.controller.status.TransmissionStatus;
+import org.apache.nifi.controller.status.analytics.StatusAnalytics;
 import org.apache.nifi.groups.ProcessGroup;
 import org.apache.nifi.groups.RemoteProcessGroup;
 import org.apache.nifi.history.History;
@@ -338,6 +339,14 @@ public class StandardEventAccess implements UserAwareEventAccess {
                 bytesTransferred += connectionStatusReport.getContentSizeIn() + connectionStatusReport.getContentSizeOut();
             }
 
+            final StatusAnalytics statusAnalytics = flowController.getStatusAnalytics();
+            if (statusAnalytics != null) {
+                connStatus.setPredictedTimeToBytesBackpressureMillis(statusAnalytics.getTimeToBytesBackpressureMillis());
+                connStatus.setPredictedTimeToCountBackpressureMillis(statusAnalytics.getTimeToCountBackpressureMillis());
+                connStatus.setNextPredictedQueuedBytes(statusAnalytics.getNextIntervalBytes());
+                connStatus.setNextPredictedQueuedCount(statusAnalytics.getNextIntervalCount());
+            }
+
             if (isConnectionAuthorized) {
                 if (StringUtils.isNotBlank(conn.getName())) {
                     connStatus.setName(conn.getName());
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
index 5afdd5a..2059c45 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
@@ -1181,9 +1181,13 @@ public final class DtoFactory {
 
         if (connectionStatus.getBackPressureObjectThreshold() > 0) {
             snapshot.setPercentUseCount(Math.min(100, StatusMerger.getUtilization(connectionStatus.getQueuedCount(), connectionStatus.getBackPressureObjectThreshold())));
+            snapshot.setPredictedMillisUntilCountBackpressure(connectionStatus.getPredictedTimeToCountBackpressureMillis());
+            snapshot.setPredictedCountAtNextInterval(connectionStatus.getNextPredictedQueuedCount());
         }
         if (connectionStatus.getBackPressureBytesThreshold() > 0) {
             snapshot.setPercentUseBytes(Math.min(100, StatusMerger.getUtilization(connectionStatus.getQueuedBytes(), connectionStatus.getBackPressureBytesThreshold())));
+            snapshot.setPredictedMillisUntilBytesBackpressure(connectionStatus.getPredictedTimeToBytesBackpressureMillis());
+            snapshot.setPredictedBytesAtNextInterval(connectionStatus.getNextPredictedQueuedBytes());
         }
 
         StatusMerger.updatePrettyPrintedFields(snapshot);
@@ -1211,7 +1215,10 @@ public final class DtoFactory {
         snapshot.setSourceName(connectionStatistics.getSourceName());
         snapshot.setDestinationName(connectionStatistics.getDestinationName());
 
-        snapshot.setPredictedMillisUntilBackpressure(connectionStatistics.getMinTimeToBackpressureMillis());
+        snapshot.setPredictedMillisUntilBytesBackpressure(connectionStatistics.getTimeToBytesBackpressureMillis());
+        snapshot.setPredictedMillisUntilCountBackpressure(connectionStatistics.getTimeToCountBackpressureMillis());
+        snapshot.setPredictedBytesAtNextInterval(connectionStatistics.getNextIntervalBytes());
+        snapshot.setPredictedCountAtNextInterval(connectionStatistics.getNextIntervalCount());
 
         return connectionStatisticsDTO;
     }