You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ai...@apache.org on 2019/08/19 13:53:54 UTC

[nifi] 18/21: NIFI-6150 Fixed NaN check and refactored time prediction. Switched to use non caching engine for testing

This is an automated email from the ASF dual-hosted git repository.

aichrist pushed a commit to branch analytics-framework
in repository https://gitbox.apache.org/repos/asf/nifi.git

commit 9cc3404bab7fe8ccf1c0a63fb0240db87466de58
Author: Yolanda Davis <yo...@gmail.com>
AuthorDate: Fri Aug 2 08:05:24 2019 -0400

    NIFI-6150 Fixed NaN check and refactored time prediction. Switched to use non caching engine for testing
---
 .../org/apache/nifi/controller/FlowController.java |  4 +-
 .../analytics/ConnectionStatusAnalytics.java       | 57 ++++++++++++----------
 2 files changed, 33 insertions(+), 28 deletions(-)

diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index 141a6a4..33ad7f7 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -150,7 +150,7 @@ import org.apache.nifi.controller.service.StandardConfigurationContext;
 import org.apache.nifi.controller.service.StandardControllerServiceProvider;
 import org.apache.nifi.controller.state.manager.StandardStateManagerProvider;
 import org.apache.nifi.controller.state.server.ZooKeeperStateServer;
-import org.apache.nifi.controller.status.analytics.CachingConnectionStatusAnalyticsEngine;
+import org.apache.nifi.controller.status.analytics.ConnectionStatusAnalyticsEngine;
 import org.apache.nifi.controller.status.analytics.StatusAnalyticsEngine;
 import org.apache.nifi.controller.status.history.ComponentStatusRepository;
 import org.apache.nifi.controller.status.history.GarbageCollectionHistory;
@@ -594,7 +594,7 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node
         }
 
         componentStatusRepository = createComponentStatusRepository();
-        analyticsEngine = new CachingConnectionStatusAnalyticsEngine(flowManager, componentStatusRepository);
+        analyticsEngine = new ConnectionStatusAnalyticsEngine(flowManager, componentStatusRepository);
         eventAccess = new StandardEventAccess(this, flowFileEventRepository);
 
         timerDrivenEngineRef.get().scheduleWithFixedDelay(new Runnable() {
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/ConnectionStatusAnalytics.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/ConnectionStatusAnalytics.java
index 6b831fa..0cb6b5d 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/ConnectionStatusAnalytics.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/ConnectionStatusAnalytics.java
@@ -73,10 +73,12 @@ public class ConnectionStatusAnalytics implements StatusAnalytics {
 
     public void refresh() {
 
-
         LOG.debug("Refreshing model with new data for connection id: {} ", connectionIdentifier);
-
-        this.queryWindow = new QueryWindow(System.currentTimeMillis() - getIntervalTimeMillis(), System.currentTimeMillis());
+        if (this.queryWindow == null) {
+            this.queryWindow = new QueryWindow(System.currentTimeMillis() - getIntervalTimeMillis(), System.currentTimeMillis());
+        } else if (supportsOnlineLearning()) {
+            this.queryWindow = new QueryWindow(queryWindow.getEndTimeMillis(), System.currentTimeMillis());
+        }
         modelMap.forEach((metric, modelFunction) -> {
 
             StatusAnalyticsModel model = modelFunction.getKey();
@@ -91,6 +93,18 @@ public class ConnectionStatusAnalytics implements StatusAnalytics {
         });
     }
 
+    protected Long getTimePrediction(Double prediction, Long timeMillis) {
+
+        if (Double.isNaN(prediction) || Double.isInfinite(prediction)) {
+            return -1L;
+        } else if (prediction < timeMillis) {
+            return 0L;
+        } else {
+            return Math.max(0, Math.round(prediction) - timeMillis);
+        }
+
+    }
+
     /**
      * Returns the predicted time (in milliseconds) when backpressure is expected to be applied to this connection, based on the total number of bytes in the queue.
      *
@@ -106,11 +120,7 @@ public class ConnectionStatusAnalytics implements StatusAnalytics {
         final String backPressureDataSize = connection.getFlowFileQueue().getBackPressureDataSizeThreshold();
         final double backPressureBytes = DataUnit.parseDataSize(backPressureDataSize, DataUnit.B);
         final double prediction = bytesModel.predictX(backPressureBytes);
-        if (prediction != Double.NaN && prediction >= System.currentTimeMillis() && !Double.isInfinite(prediction)) {
-            return Math.max(0, Math.round(prediction) - System.currentTimeMillis());
-        } else {
-            return -1L;
-        }
+        return getTimePrediction(prediction, System.currentTimeMillis());
 
     }
 
@@ -127,13 +137,8 @@ public class ConnectionStatusAnalytics implements StatusAnalytics {
             throw new NoSuchElementException("Connection with the following id cannot be found:" + connectionIdentifier + ". Model should be invalidated!");
         }
         final double backPressureCountThreshold = connection.getFlowFileQueue().getBackPressureObjectThreshold();
-        final Double prediction = countModel.predictX(backPressureCountThreshold);
-
-        if (prediction != Double.NaN && prediction >= System.currentTimeMillis() && !Double.isInfinite(prediction)) {
-            return Math.max(0, Math.round(prediction) - System.currentTimeMillis());
-        } else {
-            return -1L;
-        }
+        final double prediction = countModel.predictX(backPressureCountThreshold);
+        return getTimePrediction(prediction, System.currentTimeMillis());
     }
 
     /**
@@ -145,7 +150,7 @@ public class ConnectionStatusAnalytics implements StatusAnalytics {
     public Long getNextIntervalBytes() {
         final BivariateStatusAnalyticsModel bytesModel = (BivariateStatusAnalyticsModel) modelMap.get(ConnectionStatusDescriptor.QUEUED_BYTES.getField()).getKey();
         final Double prediction = bytesModel.predictY((double) System.currentTimeMillis() + getIntervalTimeMillis());
-        if (prediction != Double.NaN && prediction >= 0) {
+        if (!Double.isNaN(prediction) && prediction >= 0) {
             return Math.round(prediction);
         } else {
             return -1L;
@@ -161,14 +166,14 @@ public class ConnectionStatusAnalytics implements StatusAnalytics {
     public Long getNextIntervalCount() {
         final BivariateStatusAnalyticsModel countModel = (BivariateStatusAnalyticsModel) modelMap.get(ConnectionStatusDescriptor.QUEUED_COUNT.getField()).getKey();
         final Double prediction = countModel.predictY((double) System.currentTimeMillis() + getIntervalTimeMillis());
-        if (prediction != Double.NaN && prediction >= 0) {
+        if (!Double.isNaN(prediction) && prediction >= 0) {
             return Math.round(prediction);
         } else {
             return -1L;
         }
     }
 
-    public Long getNextIntervalPercentageUseCount(){
+    public Long getNextIntervalPercentageUseCount() {
 
         final Connection connection = getConnection();
         if (connection == null) {
@@ -177,15 +182,15 @@ public class ConnectionStatusAnalytics implements StatusAnalytics {
         final double backPressureCountThreshold = connection.getFlowFileQueue().getBackPressureObjectThreshold();
         final long nextIntervalCount = getNextIntervalCount();
 
-        if(nextIntervalCount > -1L) {
+        if (nextIntervalCount > -1L) {
             return Math.round((getNextIntervalCount() / backPressureCountThreshold) * 100);
-        }else{
+        } else {
             return -1L;
         }
 
     }
 
-    public Long getNextIntervalPercentageUseBytes(){
+    public Long getNextIntervalPercentageUseBytes() {
 
         final Connection connection = getConnection();
         if (connection == null) {
@@ -195,14 +200,14 @@ public class ConnectionStatusAnalytics implements StatusAnalytics {
         final double backPressureBytes = DataUnit.parseDataSize(backPressureDataSize, DataUnit.B);
         final long nextIntervalBytes = getNextIntervalBytes();
 
-        if(nextIntervalBytes > -1L) {
+        if (nextIntervalBytes > -1L) {
             return Math.round((getNextIntervalBytes() / backPressureBytes) * 100);
-        }else{
+        } else {
             return -1L;
         }
     }
 
-    public Long getIntervalTimeMillis(){
+    public Long getIntervalTimeMillis() {
         return (5L * 60 * 1000);
     }
 
@@ -226,8 +231,8 @@ public class ConnectionStatusAnalytics implements StatusAnalytics {
         predictions.put("nextIntervalPercentageUseBytes", getNextIntervalPercentageUseBytes());
         predictions.put("intervalTimeMillis", getIntervalTimeMillis());
 
-        predictions.forEach((key,value) -> {
-            LOG.debug("Prediction model for connection id {}: {}={} ", connectionIdentifier,key,value);
+        predictions.forEach((key, value) -> {
+            LOG.debug("Prediction model for connection id {}: {}={} ", connectionIdentifier, key, value);
         });
 
         return predictions;