You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ai...@apache.org on 2019/08/27 16:42:56 UTC

[nifi] 02/23: NIFI-6510 Implemented basic linear regression model for queue counts

This is an automated email from the ASF dual-hosted git repository.

aichrist pushed a commit to branch analytics-framework
in repository https://gitbox.apache.org/repos/asf/nifi.git

commit 2d8c70f907ee72391aa9e18f037b8f9b7c6dbb86
Author: Andrew I. Christianson <an...@andyic.org>
AuthorDate: Thu Jul 11 12:02:22 2019 -0400

    NIFI-6510 Implemented basic linear regression model for queue counts
---
 .../nifi-framework/nifi-framework-core/pom.xml     |  5 +++
 .../org/apache/nifi/controller/FlowController.java |  2 +-
 .../status/analytics/StatusAnalyticEngine.java     | 52 ++++++++++++++++++----
 3 files changed, 49 insertions(+), 10 deletions(-)

diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml
index a1bff42..6551d54 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml
@@ -134,6 +134,11 @@
             <artifactId>commons-io</artifactId>
         </dependency>
         <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-math3</artifactId>
+            <version>3.6.1</version>
+        </dependency>
+        <dependency>
             <groupId>org.apache.nifi</groupId>
             <artifactId>nifi-data-provenance-utils</artifactId>
             <version>1.10.0-SNAPSHOT</version>
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index 4c0288f..f7ed734 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -608,7 +608,7 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node
             @Override
             public void run() {
                 try {
-                    analyticsEngine.getMinTimeToBackpressure();
+                    analyticsEngine.getMinTimeToBackpressureMillis();
                 } catch (final Exception e) {
                     LOG.error("Failed to capture component stats for Stats History", e);
                 }
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalyticEngine.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalyticEngine.java
index 8b69ebf..0602a93 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalyticEngine.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalyticEngine.java
@@ -16,12 +16,14 @@
  */
 package org.apache.nifi.controller.status.analytics;
 
+import java.util.Date;
 import java.util.List;
-import java.util.Map.Entry;
 
+import org.apache.commons.math3.stat.regression.SimpleRegression;
 import org.apache.nifi.connectable.Connection;
 import org.apache.nifi.controller.FlowController;
 import org.apache.nifi.controller.status.history.ComponentStatusRepository;
+import org.apache.nifi.controller.status.history.ConnectionStatusDescriptor;
 import org.apache.nifi.controller.status.history.StatusHistoryUtil;
 import org.apache.nifi.groups.ProcessGroup;
 import org.apache.nifi.web.api.dto.status.StatusHistoryDTO;
@@ -40,21 +42,53 @@ public class StatusAnalyticEngine {
         this.statusRepository = statusRepository;
     }
 
-    public long getMinTimeToBackpressure() {
+    public long getMinTimeToBackpressureMillis() {
         ProcessGroup rootGroup = controller.getFlowManager().getRootGroup();
         List<Connection> allConnections = rootGroup.findAllConnections();
+        long minTimeToBackpressure = Long.MAX_VALUE;
 
         for (Connection conn : allConnections) {
             LOG.info("Getting connection history for: " + conn.getIdentifier());
-            StatusHistoryDTO connHistory = StatusHistoryUtil.createStatusHistoryDTO(
-                    statusRepository.getConnectionStatusHistory(conn.getIdentifier(), null, null, Integer.MAX_VALUE));
-            for (StatusSnapshotDTO snap : connHistory.getAggregateSnapshots()) {
-                for (Entry<String, Long> snapEntry : snap.getStatusMetrics().entrySet()) {
-                    LOG.info("Snap " + snapEntry.getKey() + ": " + snapEntry.getValue());
-                }
+            Date minDate = new Date(System.currentTimeMillis() - (5 * 60 * 1000));
+            StatusHistoryDTO connHistory = StatusHistoryUtil.createStatusHistoryDTO(statusRepository
+                    .getConnectionStatusHistory(conn.getIdentifier(), minDate, null, Integer.MAX_VALUE));
+            List<StatusSnapshotDTO> aggregateSnapshots = connHistory.getAggregateSnapshots();
+
+            if (aggregateSnapshots.size() < 2) {
+                LOG.info("Not enough data to model time to backpressure.");
+                continue;
             }
+
+            long backPressureObjectThreshold = conn.getFlowFileQueue().getBackPressureObjectThreshold();
+            LOG.info("Connection " + conn.getIdentifier() + " backpressure object threshold is "
+                    + Long.toString(backPressureObjectThreshold));
+
+            ConnectionStatusDescriptor.QUEUED_COUNT.getField();
+
+            SimpleRegression regression = new SimpleRegression();
+
+            for (StatusSnapshotDTO snap : aggregateSnapshots) {
+                Long snapQueuedCount = snap.getStatusMetrics().get(ConnectionStatusDescriptor.QUEUED_COUNT.getField());
+                long snapTime = snap.getTimestamp().getTime();
+                regression.addData(snapTime, snapQueuedCount);
+            }
+
+            // Skip this connection if its queue is declining.
+            if (regression.getSlope() <= 0) {
+                LOG.info("Connection " + conn.getIdentifier() + " is not experiencing backpressure.");
+                continue;
+            }
+
+            // Compute time-to backpressure for this connection; Reduce total result iff
+            // this connection is lower.
+            long connTimeToBackpressure = Math
+                    .round((backPressureObjectThreshold - regression.getIntercept()) / regression.getSlope())
+                    - System.currentTimeMillis();
+            LOG.info("Connection " + conn.getIdentifier() + " time to backpressure is " + connTimeToBackpressure);
+            minTimeToBackpressure = Math.min(minTimeToBackpressure, connTimeToBackpressure);
         }
 
-        return 0;
+        LOG.info("Min time to backpressure is: " + Long.toString(minTimeToBackpressure));
+        return minTimeToBackpressure;
     }
 }