You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ai...@apache.org on 2019/08/27 16:43:08 UTC

[nifi] 14/23: NIFI-6510 Changes to inject flowManager instead of flow controller, also changes to properly reflect when predictions can be made vs not.

This is an automated email from the ASF dual-hosted git repository.

aichrist pushed a commit to branch analytics-framework
in repository https://gitbox.apache.org/repos/asf/nifi.git

commit c98ae35ddcdbbb56e79fdedd2f39bb21ef8d417e
Author: Yolanda Davis <yo...@gmail.com>
AuthorDate: Wed Jul 31 08:57:15 2019 -0400

    NIFI-6510 Changes to inject flowManager instead of flow controller, also changes to properly reflect when predictions can be made vs not.
    
    (cherry picked from commit 6fae058)
---
 .../org/apache/nifi/controller/FlowController.java |  2 +-
 .../CachingConnectionStatusAnalyticsEngine.java    | 16 ++--
 .../analytics/ConnectionStatusAnalytics.java       | 89 +++++++++++++---------
 .../analytics/ConnectionStatusAnalyticsEngine.java | 11 ++-
 .../status/analytics/SimpleRegressionBSAM.java     | 18 ++++-
 5 files changed, 81 insertions(+), 55 deletions(-)

diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index b5de777..141a6a4 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -594,7 +594,7 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node
         }
 
         componentStatusRepository = createComponentStatusRepository();
-        analyticsEngine = new CachingConnectionStatusAnalyticsEngine(this, componentStatusRepository);
+        analyticsEngine = new CachingConnectionStatusAnalyticsEngine(flowManager, componentStatusRepository);
         eventAccess = new StandardEventAccess(this, flowFileEventRepository);
 
         timerDrivenEngineRef.get().scheduleWithFixedDelay(new Runnable() {
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/CachingConnectionStatusAnalyticsEngine.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/CachingConnectionStatusAnalyticsEngine.java
index c4836c6..f69ed33 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/CachingConnectionStatusAnalyticsEngine.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/CachingConnectionStatusAnalyticsEngine.java
@@ -18,7 +18,7 @@ package org.apache.nifi.controller.status.analytics;
 
 import java.util.concurrent.TimeUnit;
 
-import org.apache.nifi.controller.FlowController;
+import org.apache.nifi.controller.flow.FlowManager;
 import org.apache.nifi.controller.status.history.ComponentStatusRepository;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -28,15 +28,15 @@ import com.github.benmanes.caffeine.cache.Caffeine;
 
 public class CachingConnectionStatusAnalyticsEngine implements StatusAnalyticsEngine {
     private ComponentStatusRepository statusRepository;
-    private FlowController controller;
+    private FlowManager flowManager;
     private volatile Cache<String, ConnectionStatusAnalytics> cache;
     private static final Logger LOG = LoggerFactory.getLogger(CachingConnectionStatusAnalyticsEngine.class);
 
-    public CachingConnectionStatusAnalyticsEngine(FlowController controller, ComponentStatusRepository statusRepository) {
-        this.controller = controller;
+    public CachingConnectionStatusAnalyticsEngine(FlowManager flowManager, ComponentStatusRepository statusRepository) {
+        this.flowManager = flowManager;
         this.statusRepository = statusRepository;
         this.cache = Caffeine.newBuilder()
-                .expireAfterWrite(5, TimeUnit.MINUTES)
+                .expireAfterWrite(30, TimeUnit.MINUTES)
                 .build();
     }
 
@@ -45,12 +45,12 @@ public class CachingConnectionStatusAnalyticsEngine implements StatusAnalyticsEn
 
         ConnectionStatusAnalytics connectionStatusAnalytics = cache.getIfPresent(identifier);
         if(connectionStatusAnalytics == null){
-            LOG.info("Creating new analytics for connection id: {0}", identifier);
-            connectionStatusAnalytics = new ConnectionStatusAnalytics(statusRepository,controller,identifier);
+            LOG.debug("Creating new status analytics object for connection id: {}", identifier);
+            connectionStatusAnalytics = new ConnectionStatusAnalytics(statusRepository,flowManager,identifier,true);
             connectionStatusAnalytics.init();
             cache.put(identifier,connectionStatusAnalytics);
         }else{
-            LOG.info("Pulled existing analytics from cache for connection id: {}", identifier);
+            LOG.debug("Pulled existing analytics from cache for connection id: {}", identifier);
             connectionStatusAnalytics.refresh();
         }
         return connectionStatusAnalytics;
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/ConnectionStatusAnalytics.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/ConnectionStatusAnalytics.java
index bad2ff1..8b7964e 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/ConnectionStatusAnalytics.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/ConnectionStatusAnalytics.java
@@ -25,7 +25,7 @@ import java.util.Optional;
 import java.util.stream.Stream;
 
 import org.apache.nifi.connectable.Connection;
-import org.apache.nifi.controller.FlowController;
+import org.apache.nifi.controller.flow.FlowManager;
 import org.apache.nifi.controller.status.history.ComponentStatusRepository;
 import org.apache.nifi.controller.status.history.ConnectionStatusDescriptor;
 import org.apache.nifi.controller.status.history.StatusHistory;
@@ -45,24 +45,27 @@ public class ConnectionStatusAnalytics implements StatusAnalytics {
     private QueryWindow queryWindow;
     private final ComponentStatusRepository componentStatusRepository;
     private final String connectionIdentifier;
-    private final FlowController flowController;
+    private final FlowManager flowManager;
+    private final Boolean supportOnlineLearning;
 
-    public ConnectionStatusAnalytics(ComponentStatusRepository componentStatusRepository, FlowController flowController, String connectionIdentifier) {
+    public ConnectionStatusAnalytics(ComponentStatusRepository componentStatusRepository, FlowManager flowManager, String connectionIdentifier, Boolean supportOnlineLearning) {
         this.componentStatusRepository = componentStatusRepository;
-        this.flowController = flowController;
+        this.flowManager = flowManager;
         this.connectionIdentifier = connectionIdentifier;
+        this.supportOnlineLearning = supportOnlineLearning;
     }
 
     public void init() {
 
+        LOG.debug("Initialize analytics connection id: {} ", connectionIdentifier);
+
         if (this.modelMap == null || this.modelMap.isEmpty()) {
-            Tuple<StatusAnalyticsModel, ExtractFunction> countModelFunction = new Tuple<>(new SimpleRegressionBSAM(), extract);
-            Tuple<StatusAnalyticsModel, ExtractFunction> byteModelFunction = new Tuple<>(new SimpleRegressionBSAM(), extract);
+            Tuple<StatusAnalyticsModel, ExtractFunction> countModelFunction = new Tuple<>(new SimpleRegressionBSAM(!supportsOnlineLearning()), extract);
+            Tuple<StatusAnalyticsModel, ExtractFunction> byteModelFunction = new Tuple<>(new SimpleRegressionBSAM(!supportsOnlineLearning()), extract);
             this.modelMap = new HashMap<>();
             //TODO: Should change keys used here
             this.modelMap.put(ConnectionStatusDescriptor.QUEUED_COUNT.getField(), countModelFunction);
             this.modelMap.put(ConnectionStatusDescriptor.QUEUED_BYTES.getField(), byteModelFunction);
-            this.queryWindow = new QueryWindow(System.currentTimeMillis() - (5 * 60 * 1000), System.currentTimeMillis());
         }
 
         refresh();
@@ -70,17 +73,20 @@ public class ConnectionStatusAnalytics implements StatusAnalytics {
 
     public void refresh() {
 
+
+        LOG.debug("Refreshing model with new data for connection id: {} ", connectionIdentifier);
+
+        this.queryWindow = new QueryWindow(System.currentTimeMillis() - getIntervalTimeMillis(), System.currentTimeMillis());
         modelMap.forEach((metric, modelFunction) -> {
 
             StatusAnalyticsModel model = modelFunction.getKey();
             ExtractFunction extract = modelFunction.getValue();
             StatusHistory statusHistory = componentStatusRepository.getConnectionStatusHistory(connectionIdentifier, queryWindow.getStartDateTime(), queryWindow.getEndDateTime(), Integer.MAX_VALUE);
             Tuple<Stream<Double>, Stream<Double>> modelData = extract.extractMetric(metric, statusHistory);
-            LOG.info("Refreshing model for connection id: {} ", connectionIdentifier);
             Stream<Double> times = modelData.getKey();
-            Stream<Double> counts = modelData.getValue();
-            //times is the X axis and counts is on the y axis
-            model.learn(times, counts);
+            Stream<Double> values = modelData.getValue();
+            //times are the X axis and values are on the y axis
+            model.learn(times, values);
 
         });
     }
@@ -90,7 +96,7 @@ public class ConnectionStatusAnalytics implements StatusAnalytics {
      *
      * @return milliseconds until backpressure is predicted to occur, based on the total number of bytes in the queue.
      */
-    public long getTimeToBytesBackpressureMillis() {
+    public Long getTimeToBytesBackpressureMillis() {
 
         final BivariateStatusAnalyticsModel bytesModel = (BivariateStatusAnalyticsModel) modelMap.get(ConnectionStatusDescriptor.QUEUED_BYTES.getField()).getKey();
         final Connection connection = getConnection();
@@ -100,10 +106,10 @@ public class ConnectionStatusAnalytics implements StatusAnalytics {
         final String backPressureDataSize = connection.getFlowFileQueue().getBackPressureDataSizeThreshold();
         final double backPressureBytes = DataUnit.parseDataSize(backPressureDataSize, DataUnit.B);
         final double prediction = bytesModel.predictX(backPressureBytes);
-        if (prediction != Double.NaN) {
+        if (prediction != Double.NaN && prediction >= System.currentTimeMillis() && !Double.isInfinite(prediction)) {
             return Math.max(0, Math.round(prediction) - System.currentTimeMillis());
         } else {
-            return Long.MAX_VALUE;
+            return -1L;
         }
 
     }
@@ -113,7 +119,7 @@ public class ConnectionStatusAnalytics implements StatusAnalytics {
      *
      * @return milliseconds until backpressure is predicted to occur, based on the number of objects in the queue.
      */
-    public long getTimeToCountBackpressureMillis() {
+    public Long getTimeToCountBackpressureMillis() {
 
         final BivariateStatusAnalyticsModel countModel = (BivariateStatusAnalyticsModel) modelMap.get(ConnectionStatusDescriptor.QUEUED_COUNT.getField()).getKey();
         final Connection connection = getConnection();
@@ -123,10 +129,10 @@ public class ConnectionStatusAnalytics implements StatusAnalytics {
         final double backPressureCountThreshold = connection.getFlowFileQueue().getBackPressureObjectThreshold();
         final Double prediction = countModel.predictX(backPressureCountThreshold);
 
-        if (prediction != Double.NaN) {
+        if (prediction != Double.NaN && prediction >= System.currentTimeMillis() && !Double.isInfinite(prediction)) {
             return Math.max(0, Math.round(prediction) - System.currentTimeMillis());
         } else {
-            return Long.MAX_VALUE;
+            return -1L;
         }
     }
 
@@ -136,13 +142,13 @@ public class ConnectionStatusAnalytics implements StatusAnalytics {
      * @return milliseconds until backpressure is predicted to occur, based on the total number of bytes in the queue.
      */
 
-    public long getNextIntervalBytes() {
+    public Long getNextIntervalBytes() {
         final BivariateStatusAnalyticsModel bytesModel = (BivariateStatusAnalyticsModel) modelMap.get(ConnectionStatusDescriptor.QUEUED_BYTES.getField()).getKey();
         final Double prediction = bytesModel.predictY((double) System.currentTimeMillis() + getIntervalTimeMillis());
-        if (prediction != Double.NaN) {
+        if (prediction != Double.NaN && prediction >= 0) {
             return Math.round(prediction);
         } else {
-            return 0;
+            return -1L;
         }
     }
 
@@ -152,29 +158,34 @@ public class ConnectionStatusAnalytics implements StatusAnalytics {
      * @return milliseconds until backpressure is predicted to occur, based on the number of bytes in the queue.
      */
 
-    public int getNextIntervalCount() {
+    public Long getNextIntervalCount() {
         final BivariateStatusAnalyticsModel countModel = (BivariateStatusAnalyticsModel) modelMap.get(ConnectionStatusDescriptor.QUEUED_COUNT.getField()).getKey();
         final Double prediction = countModel.predictY((double) System.currentTimeMillis() + getIntervalTimeMillis());
-        if (prediction != Double.NaN) {
-            return ((Long) Math.round(prediction)).intValue();
+        if (prediction != Double.NaN && prediction >= 0) {
+            return Math.round(prediction);
         } else {
-            return 0;
+            return -1L;
         }
     }
 
-    public int getNextIntervalPercentageUseCount(){
+    public Long getNextIntervalPercentageUseCount(){
 
         final Connection connection = getConnection();
         if (connection == null) {
             throw new NoSuchElementException("Connection with the following id cannot be found:" + connectionIdentifier + ". Model should be invalidated!");
         }
         final double backPressureCountThreshold = connection.getFlowFileQueue().getBackPressureObjectThreshold();
+        final long nextIntervalCount = getNextIntervalCount();
 
-        return ((Long)Math.round((getNextIntervalCount()/backPressureCountThreshold) * 100)).intValue();
+        if(nextIntervalCount > -1L) {
+            return Math.round((getNextIntervalCount() / backPressureCountThreshold) * 100);
+        }else{
+            return -1L;
+        }
 
     }
 
-    public int getNextIntervalPercentageUseBytes(){
+    public Long getNextIntervalPercentageUseBytes(){
 
         final Connection connection = getConnection();
         if (connection == null) {
@@ -182,13 +193,17 @@ public class ConnectionStatusAnalytics implements StatusAnalytics {
         }
         final String backPressureDataSize = connection.getFlowFileQueue().getBackPressureDataSizeThreshold();
         final double backPressureBytes = DataUnit.parseDataSize(backPressureDataSize, DataUnit.B);
+        final long nextIntervalBytes = getNextIntervalBytes();
 
-        return ((Long)Math.round((getNextIntervalBytes()/ backPressureBytes) * 100)).intValue();
-
+        if(nextIntervalBytes > -1L) {
+            return Math.round((getNextIntervalBytes() / backPressureBytes) * 100);
+        }else{
+            return -1L;
+        }
     }
 
-    public long getIntervalTimeMillis(){
-        return getQueryWindow().getTimeDifferenceMillis();
+    public Long getIntervalTimeMillis(){
+        return (5L * 60 * 1000);
     }
 
     @Override
@@ -206,13 +221,13 @@ public class ConnectionStatusAnalytics implements StatusAnalytics {
         predictions.put("timeToBytesBackpressureMillis", getTimeToBytesBackpressureMillis());
         predictions.put("timeToCountBackpressureMillis", getTimeToCountBackpressureMillis());
         predictions.put("nextIntervalBytes", getNextIntervalBytes());
-        predictions.put("nextIntervalCount", (long) getNextIntervalCount());
-        predictions.put("nextIntervalPercentageUseCount", (long)getNextIntervalPercentageUseCount());
-        predictions.put("nextIntervalPercentageUseBytes", (long)getNextIntervalPercentageUseBytes());
+        predictions.put("nextIntervalCount", getNextIntervalCount());
+        predictions.put("nextIntervalPercentageUseCount", getNextIntervalPercentageUseCount());
+        predictions.put("nextIntervalPercentageUseBytes", getNextIntervalPercentageUseBytes());
         predictions.put("intervalTimeMillis", getIntervalTimeMillis());
 
         predictions.forEach((key,value) -> {
-            LOG.info("Prediction model for connection id {}: {}={} ", connectionIdentifier,key,value);
+            LOG.debug("Prediction model for connection id {}: {}={} ", connectionIdentifier,key,value);
         });
 
         return predictions;
@@ -220,11 +235,11 @@ public class ConnectionStatusAnalytics implements StatusAnalytics {
 
     @Override
     public boolean supportsOnlineLearning() {
-        return true;
+        return supportOnlineLearning;
     }
 
     private Connection getConnection() {
-        final ProcessGroup rootGroup = flowController.getFlowManager().getRootGroup();
+        final ProcessGroup rootGroup = flowManager.getRootGroup();
         Optional<Connection> connection = rootGroup.findAllConnections().stream().filter(c -> c.getIdentifier().equals(this.connectionIdentifier)).findFirst();
         return connection.orElse(null);
     }
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/ConnectionStatusAnalyticsEngine.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/ConnectionStatusAnalyticsEngine.java
index 7f9db25..8f1222c 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/ConnectionStatusAnalyticsEngine.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/ConnectionStatusAnalyticsEngine.java
@@ -16,28 +16,27 @@
  */
 package org.apache.nifi.controller.status.analytics;
 
-import org.apache.nifi.controller.FlowController;
+import org.apache.nifi.controller.flow.FlowManager;
 import org.apache.nifi.controller.status.history.ComponentStatusRepository;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class ConnectionStatusAnalyticsEngine implements StatusAnalyticsEngine {
     private ComponentStatusRepository statusRepository;
-    private FlowController controller;
+    private FlowManager flowManager;
 
     private static final Logger LOG = LoggerFactory.getLogger(ConnectionStatusAnalyticsEngine.class);
 
-    public ConnectionStatusAnalyticsEngine(FlowController controller, ComponentStatusRepository statusRepository) {
-        this.controller = controller;
+    public ConnectionStatusAnalyticsEngine(FlowManager flowManager, ComponentStatusRepository statusRepository) {
+        this.flowManager = flowManager;
         this.statusRepository = statusRepository;
     }
 
     @Override
     public StatusAnalytics getStatusAnalytics(String identifier) {
-        ConnectionStatusAnalytics connectionStatusAnalytics = new ConnectionStatusAnalytics(statusRepository,controller,identifier);
+        ConnectionStatusAnalytics connectionStatusAnalytics = new ConnectionStatusAnalytics(statusRepository,flowManager,identifier,false);
         connectionStatusAnalytics.init();
         return connectionStatusAnalytics;
     }
 
-
 }
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/SimpleRegressionBSAM.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/SimpleRegressionBSAM.java
index 8aa4a45..21c0370 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/SimpleRegressionBSAM.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/SimpleRegressionBSAM.java
@@ -21,21 +21,33 @@ import java.util.stream.Stream;
 
 import org.apache.commons.lang3.ArrayUtils;
 import org.apache.commons.math3.stat.regression.SimpleRegression;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class SimpleRegressionBSAM extends BivariateStatusAnalyticsModel {
 
-    private SimpleRegression regression;
+    private static final Logger LOG = LoggerFactory.getLogger(SimpleRegressionBSAM.class);
+    private final SimpleRegression regression;
+    private final Boolean clearObservationsOnLearn;
+
+    public SimpleRegressionBSAM(Boolean clearObservationsOnLearn) {
 
-    public SimpleRegressionBSAM() {
         this.regression = new SimpleRegression();
+        this.clearObservationsOnLearn = clearObservationsOnLearn;
     }
 
     @Override
     public void learn(Stream<Double> features, Stream<Double> labels) {
         double[] labelArray = ArrayUtils.toPrimitive(labels.toArray(Double[]::new));
         double[][] featuresMatrix = features.map(feature -> new double[]{feature}).toArray(double[][]::new);
-        regression.clear();
+
+        if(clearObservationsOnLearn) {
+            regression.clear();
+        }
+
         regression.addObservations(featuresMatrix, labelArray);
+        LOG.debug("Model is using equation: y = {}x + {}", regression.getSlope(), regression.getIntercept());
+
     }
 
     @Override