You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ai...@apache.org on 2019/08/28 20:20:55 UTC
[nifi] 02/24: NIFI-6510 Implemented basic linear regression model
for queue counts
This is an automated email from the ASF dual-hosted git repository.
aichrist pushed a commit to branch analytics-framework
in repository https://gitbox.apache.org/repos/asf/nifi.git
commit 22a15bd0ab9e6c3ddfe61cbe617adc85f94a8dc0
Author: Andrew I. Christianson <an...@andyic.org>
AuthorDate: Thu Jul 11 12:02:22 2019 -0400
NIFI-6510 Implemented basic linear regression model for queue counts
---
.../nifi-framework/nifi-framework-core/pom.xml | 5 +++
.../org/apache/nifi/controller/FlowController.java | 2 +-
.../status/analytics/StatusAnalyticEngine.java | 52 ++++++++++++++++++----
3 files changed, 49 insertions(+), 10 deletions(-)
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml
index a1bff42..6551d54 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml
@@ -134,6 +134,11 @@
<artifactId>commons-io</artifactId>
</dependency>
<dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-math3</artifactId>
+ <version>3.6.1</version>
+ </dependency>
+ <dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-data-provenance-utils</artifactId>
<version>1.10.0-SNAPSHOT</version>
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index 4c0288f..f7ed734 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -608,7 +608,7 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node
@Override
public void run() {
try {
- analyticsEngine.getMinTimeToBackpressure();
+ analyticsEngine.getMinTimeToBackpressureMillis();
} catch (final Exception e) {
LOG.error("Failed to capture component stats for Stats History", e);
}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalyticEngine.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalyticEngine.java
index 8b69ebf..0602a93 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalyticEngine.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalyticEngine.java
@@ -16,12 +16,14 @@
*/
package org.apache.nifi.controller.status.analytics;
+import java.util.Date;
import java.util.List;
-import java.util.Map.Entry;
+import org.apache.commons.math3.stat.regression.SimpleRegression;
import org.apache.nifi.connectable.Connection;
import org.apache.nifi.controller.FlowController;
import org.apache.nifi.controller.status.history.ComponentStatusRepository;
+import org.apache.nifi.controller.status.history.ConnectionStatusDescriptor;
import org.apache.nifi.controller.status.history.StatusHistoryUtil;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.web.api.dto.status.StatusHistoryDTO;
@@ -40,21 +42,53 @@ public class StatusAnalyticEngine {
this.statusRepository = statusRepository;
}
- public long getMinTimeToBackpressure() {
+ public long getMinTimeToBackpressureMillis() {
ProcessGroup rootGroup = controller.getFlowManager().getRootGroup();
List<Connection> allConnections = rootGroup.findAllConnections();
+ long minTimeToBackpressure = Long.MAX_VALUE;
for (Connection conn : allConnections) {
LOG.info("Getting connection history for: " + conn.getIdentifier());
- StatusHistoryDTO connHistory = StatusHistoryUtil.createStatusHistoryDTO(
- statusRepository.getConnectionStatusHistory(conn.getIdentifier(), null, null, Integer.MAX_VALUE));
- for (StatusSnapshotDTO snap : connHistory.getAggregateSnapshots()) {
- for (Entry<String, Long> snapEntry : snap.getStatusMetrics().entrySet()) {
- LOG.info("Snap " + snapEntry.getKey() + ": " + snapEntry.getValue());
- }
+ Date minDate = new Date(System.currentTimeMillis() - (5 * 60 * 1000));
+ StatusHistoryDTO connHistory = StatusHistoryUtil.createStatusHistoryDTO(statusRepository
+ .getConnectionStatusHistory(conn.getIdentifier(), minDate, null, Integer.MAX_VALUE));
+ List<StatusSnapshotDTO> aggregateSnapshots = connHistory.getAggregateSnapshots();
+
+ if (aggregateSnapshots.size() < 2) {
+ LOG.info("Not enough data to model time to backpressure.");
+ continue;
}
+
+ long backPressureObjectThreshold = conn.getFlowFileQueue().getBackPressureObjectThreshold();
+ LOG.info("Connection " + conn.getIdentifier() + " backpressure object threshold is "
+ + Long.toString(backPressureObjectThreshold));
+
+ ConnectionStatusDescriptor.QUEUED_COUNT.getField();
+
+ SimpleRegression regression = new SimpleRegression();
+
+ for (StatusSnapshotDTO snap : aggregateSnapshots) {
+ Long snapQueuedCount = snap.getStatusMetrics().get(ConnectionStatusDescriptor.QUEUED_COUNT.getField());
+ long snapTime = snap.getTimestamp().getTime();
+ regression.addData(snapTime, snapQueuedCount);
+ }
+
+ // Skip this connection if its queue is declining.
+ if (regression.getSlope() <= 0) {
+ LOG.info("Connection " + conn.getIdentifier() + " is not experiencing backpressure.");
+ continue;
+ }
+
+ // Compute time-to backpressure for this connection; Reduce total result iff
+ // this connection is lower.
+ long connTimeToBackpressure = Math
+ .round((backPressureObjectThreshold - regression.getIntercept()) / regression.getSlope())
+ - System.currentTimeMillis();
+ LOG.info("Connection " + conn.getIdentifier() + " time to backpressure is " + connTimeToBackpressure);
+ minTimeToBackpressure = Math.min(minTimeToBackpressure, connTimeToBackpressure);
}
- return 0;
+ LOG.info("Min time to backpressure is: " + Long.toString(minTimeToBackpressure));
+ return minTimeToBackpressure;
}
}