You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ai...@apache.org on 2019/08/27 16:43:03 UTC

[nifi] 09/23: NIFI-6510 Revert "DFA-9 Remove redundant connection prediction interfaces as we can just use ConnectionStatusAnalytics directly"

This is an automated email from the ASF dual-hosted git repository.

aichrist pushed a commit to branch analytics-framework
in repository https://gitbox.apache.org/repos/asf/nifi.git

commit 9761ee1b4d19f509d39f4d53eb32e620fdfe28d7
Author: Matthew Burgess <ma...@apache.org>
AuthorDate: Wed Jul 24 13:06:51 2019 -0400

    NIFI-6510 Revert "DFA-9 Remove redundant connection prediction interfaces as we can just use ConnectionStatusAnalytics directly"
    
    This reverts commit 5b9fead1471059098c0e98343fb337070f1c75c1.
---
 .../status/analytics/StatusAnalytics.java          | 28 ++++++++++++++++++++++
 .../analytics/CachingStatusAnalyticEngine.java     | 20 ++++++++++++++++
 .../status/analytics/StatusAnalyticEngine.java     | 22 +++++++++++++++++
 .../apache/nifi/reporting/StandardEventAccess.java | 10 ++++----
 4 files changed, 74 insertions(+), 6 deletions(-)

diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalytics.java b/nifi-api/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalytics.java
index 131531f..45e1c12 100644
--- a/nifi-api/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalytics.java
+++ b/nifi-api/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalytics.java
@@ -28,4 +28,32 @@ public interface StatusAnalytics {
      * @return A ConnectionStatusAnalytics object
      */
     ConnectionStatusAnalytics getConnectionStatusAnalytics(String connectionId);
+
+    /**
+     * Returns the predicted time (in milliseconds) when backpressure is expected to be applied to this connection, based on the total number of bytes in the queue.
+     * @return milliseconds until backpressure is predicted to occur, based on the total number of bytes in the queue.
+     * @param connectionId
+     */
+    long getTimeToBytesBackpressureMillis(String connectionId);
+
+    /**
+     * Returns the predicted time (in milliseconds) when backpressure is expected to be applied to this connection, based on the number of objects in the queue.
+     * @return milliseconds until backpressure is predicted to occur, based on the number of objects in the queue.
+     * @param connectionId
+     */
+    long getTimeToCountBackpressureMillis(String connectionId);
+
+    /**
+     * Returns the predicted total number of bytes in the queue to occur at the next configured interval (5 mins in the future, e.g.).
+     * @return milliseconds until backpressure is predicted to occur, based on the total number of bytes in the queue.
+     * @param connectionId
+     */
+    long getNextIntervalBytes(String connectionId);
+
+    /**
+     * Returns the predicted number of objects in the queue to occur at the next configured interval (5 mins in the future, e.g.).
+     * @return milliseconds until backpressure is predicted to occur, based on the number of bytes in the queue.
+     * @param connectionId
+     */
+    int getNextIntervalCount(String connectionId);
 }
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/CachingStatusAnalyticEngine.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/CachingStatusAnalyticEngine.java
index 015d6f8..864a5d4 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/CachingStatusAnalyticEngine.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/CachingStatusAnalyticEngine.java
@@ -51,6 +51,26 @@ public class CachingStatusAnalyticEngine implements StatusAnalytics {
         return cachedResult;
     }
 
+    @Override
+    public long getTimeToBytesBackpressureMillis(String connectionId) {
+        return 0;
+    }
+
+    @Override
+    public long getTimeToCountBackpressureMillis(String connectionId) {
+        return getConnectionStatusAnalytics(connectionId).getTimeToCountBackpressureMillis();
+    }
+
+    @Override
+    public long getNextIntervalBytes(String connectionId) {
+        return 0;
+    }
+
+    @Override
+    public int getNextIntervalCount(String connectionId) {
+        return 0;
+    }
+
     protected ConnectionStatusAnalytics calculate(SimpleRegression regression, Connection conn){
         long backPressureObjectThreshold = conn.getFlowFileQueue().getBackPressureObjectThreshold();
 
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalyticEngine.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalyticEngine.java
index 56c263e..5a873d5 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalyticEngine.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalyticEngine.java
@@ -158,6 +158,7 @@ public class StatusAnalyticEngine implements StatusAnalytics {
     public long getMinTimeToBackpressureMillis() {
         ProcessGroup rootGroup = controller.getFlowManager().getRootGroup();
         List<Connection> allConnections = rootGroup.findAllConnections();
+        rootGroup.findConnection("asdf");
         long minTimeToBackpressure = Long.MAX_VALUE;
 
         for (Connection conn : allConnections) {
@@ -168,4 +169,25 @@ public class StatusAnalyticEngine implements StatusAnalytics {
         LOG.info("Min time to backpressure is: " + Long.toString(minTimeToBackpressure));
         return minTimeToBackpressure;
     }
+
+    // TODO - populate the prediction fields. Do we need to pass in connection ID?
+    @Override
+    public long getTimeToCountBackpressureMillis(String connectionId) {
+        return 0;
+    }
+
+    @Override
+    public long getTimeToBytesBackpressureMillis(String connectionId) {
+        return 0;
+    }
+
+    @Override
+    public long getNextIntervalBytes(String connectionId) {
+        return 0;
+    }
+
+    @Override
+    public int getNextIntervalCount(String connectionId) {
+        return 0;
+    }
 }
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/reporting/StandardEventAccess.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/reporting/StandardEventAccess.java
index aeb9559..87fcd4d 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/reporting/StandardEventAccess.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/reporting/StandardEventAccess.java
@@ -51,7 +51,6 @@ import org.apache.nifi.controller.status.ProcessorStatus;
 import org.apache.nifi.controller.status.RemoteProcessGroupStatus;
 import org.apache.nifi.controller.status.RunStatus;
 import org.apache.nifi.controller.status.TransmissionStatus;
-import org.apache.nifi.controller.status.analytics.ConnectionStatusAnalytics;
 import org.apache.nifi.controller.status.analytics.StatusAnalytics;
 import org.apache.nifi.groups.ProcessGroup;
 import org.apache.nifi.groups.RemoteProcessGroup;
@@ -343,11 +342,10 @@ public class StandardEventAccess implements UserAwareEventAccess {
             }
 
             if (statusAnalytics != null) {
-                ConnectionStatusAnalytics connectionStatusAnalytics = statusAnalytics.getConnectionStatusAnalytics(conn.getIdentifier());
-                connStatus.setPredictedTimeToBytesBackpressureMillis(connectionStatusAnalytics.getTimeToBytesBackpressureMillis());
-                connStatus.setPredictedTimeToCountBackpressureMillis(connectionStatusAnalytics.getTimeToCountBackpressureMillis());
-                connStatus.setNextPredictedQueuedBytes(connectionStatusAnalytics.getNextIntervalBytes());
-                connStatus.setNextPredictedQueuedCount(connectionStatusAnalytics.getNextIntervalCount());
+                connStatus.setPredictedTimeToBytesBackpressureMillis(statusAnalytics.getTimeToBytesBackpressureMillis(conn.getIdentifier()));
+                connStatus.setPredictedTimeToCountBackpressureMillis(statusAnalytics.getTimeToCountBackpressureMillis(conn.getIdentifier()));
+                connStatus.setNextPredictedQueuedBytes(statusAnalytics.getNextIntervalBytes(conn.getIdentifier()));
+                connStatus.setNextPredictedQueuedCount(statusAnalytics.getNextIntervalCount(conn.getIdentifier()));
             }
 
             if (isConnectionAuthorized) {