You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ai...@apache.org on 2019/08/27 16:43:08 UTC
[nifi] 14/23: NIFI-6510 Changes to inject flowManager instead of
flow controller,
also changes to properly reflect when predictions can be made vs not.
This is an automated email from the ASF dual-hosted git repository.
aichrist pushed a commit to branch analytics-framework
in repository https://gitbox.apache.org/repos/asf/nifi.git
commit c98ae35ddcdbbb56e79fdedd2f39bb21ef8d417e
Author: Yolanda Davis <yo...@gmail.com>
AuthorDate: Wed Jul 31 08:57:15 2019 -0400
NIFI-6510 Changes to inject flowManager instead of flow controller, also changes to properly reflect when predictions can be made vs not.
(cherry picked from commit 6fae058)
---
.../org/apache/nifi/controller/FlowController.java | 2 +-
.../CachingConnectionStatusAnalyticsEngine.java | 16 ++--
.../analytics/ConnectionStatusAnalytics.java | 89 +++++++++++++---------
.../analytics/ConnectionStatusAnalyticsEngine.java | 11 ++-
.../status/analytics/SimpleRegressionBSAM.java | 18 ++++-
5 files changed, 81 insertions(+), 55 deletions(-)
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index b5de777..141a6a4 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -594,7 +594,7 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node
}
componentStatusRepository = createComponentStatusRepository();
- analyticsEngine = new CachingConnectionStatusAnalyticsEngine(this, componentStatusRepository);
+ analyticsEngine = new CachingConnectionStatusAnalyticsEngine(flowManager, componentStatusRepository);
eventAccess = new StandardEventAccess(this, flowFileEventRepository);
timerDrivenEngineRef.get().scheduleWithFixedDelay(new Runnable() {
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/CachingConnectionStatusAnalyticsEngine.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/CachingConnectionStatusAnalyticsEngine.java
index c4836c6..f69ed33 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/CachingConnectionStatusAnalyticsEngine.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/CachingConnectionStatusAnalyticsEngine.java
@@ -18,7 +18,7 @@ package org.apache.nifi.controller.status.analytics;
import java.util.concurrent.TimeUnit;
-import org.apache.nifi.controller.FlowController;
+import org.apache.nifi.controller.flow.FlowManager;
import org.apache.nifi.controller.status.history.ComponentStatusRepository;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -28,15 +28,15 @@ import com.github.benmanes.caffeine.cache.Caffeine;
public class CachingConnectionStatusAnalyticsEngine implements StatusAnalyticsEngine {
private ComponentStatusRepository statusRepository;
- private FlowController controller;
+ private FlowManager flowManager;
private volatile Cache<String, ConnectionStatusAnalytics> cache;
private static final Logger LOG = LoggerFactory.getLogger(CachingConnectionStatusAnalyticsEngine.class);
- public CachingConnectionStatusAnalyticsEngine(FlowController controller, ComponentStatusRepository statusRepository) {
- this.controller = controller;
+ public CachingConnectionStatusAnalyticsEngine(FlowManager flowManager, ComponentStatusRepository statusRepository) {
+ this.flowManager = flowManager;
this.statusRepository = statusRepository;
this.cache = Caffeine.newBuilder()
- .expireAfterWrite(5, TimeUnit.MINUTES)
+ .expireAfterWrite(30, TimeUnit.MINUTES)
.build();
}
@@ -45,12 +45,12 @@ public class CachingConnectionStatusAnalyticsEngine implements StatusAnalyticsEn
ConnectionStatusAnalytics connectionStatusAnalytics = cache.getIfPresent(identifier);
if(connectionStatusAnalytics == null){
- LOG.info("Creating new analytics for connection id: {0}", identifier);
- connectionStatusAnalytics = new ConnectionStatusAnalytics(statusRepository,controller,identifier);
+ LOG.debug("Creating new status analytics object for connection id: {}", identifier);
+ connectionStatusAnalytics = new ConnectionStatusAnalytics(statusRepository,flowManager,identifier,true);
connectionStatusAnalytics.init();
cache.put(identifier,connectionStatusAnalytics);
}else{
- LOG.info("Pulled existing analytics from cache for connection id: {}", identifier);
+ LOG.debug("Pulled existing analytics from cache for connection id: {}", identifier);
connectionStatusAnalytics.refresh();
}
return connectionStatusAnalytics;
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/ConnectionStatusAnalytics.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/ConnectionStatusAnalytics.java
index bad2ff1..8b7964e 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/ConnectionStatusAnalytics.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/ConnectionStatusAnalytics.java
@@ -25,7 +25,7 @@ import java.util.Optional;
import java.util.stream.Stream;
import org.apache.nifi.connectable.Connection;
-import org.apache.nifi.controller.FlowController;
+import org.apache.nifi.controller.flow.FlowManager;
import org.apache.nifi.controller.status.history.ComponentStatusRepository;
import org.apache.nifi.controller.status.history.ConnectionStatusDescriptor;
import org.apache.nifi.controller.status.history.StatusHistory;
@@ -45,24 +45,27 @@ public class ConnectionStatusAnalytics implements StatusAnalytics {
private QueryWindow queryWindow;
private final ComponentStatusRepository componentStatusRepository;
private final String connectionIdentifier;
- private final FlowController flowController;
+ private final FlowManager flowManager;
+ private final Boolean supportOnlineLearning;
- public ConnectionStatusAnalytics(ComponentStatusRepository componentStatusRepository, FlowController flowController, String connectionIdentifier) {
+ public ConnectionStatusAnalytics(ComponentStatusRepository componentStatusRepository, FlowManager flowManager, String connectionIdentifier, Boolean supportOnlineLearning) {
this.componentStatusRepository = componentStatusRepository;
- this.flowController = flowController;
+ this.flowManager = flowManager;
this.connectionIdentifier = connectionIdentifier;
+ this.supportOnlineLearning = supportOnlineLearning;
}
public void init() {
+ LOG.debug("Initialize analytics connection id: {} ", connectionIdentifier);
+
if (this.modelMap == null || this.modelMap.isEmpty()) {
- Tuple<StatusAnalyticsModel, ExtractFunction> countModelFunction = new Tuple<>(new SimpleRegressionBSAM(), extract);
- Tuple<StatusAnalyticsModel, ExtractFunction> byteModelFunction = new Tuple<>(new SimpleRegressionBSAM(), extract);
+ Tuple<StatusAnalyticsModel, ExtractFunction> countModelFunction = new Tuple<>(new SimpleRegressionBSAM(!supportsOnlineLearning()), extract);
+ Tuple<StatusAnalyticsModel, ExtractFunction> byteModelFunction = new Tuple<>(new SimpleRegressionBSAM(!supportsOnlineLearning()), extract);
this.modelMap = new HashMap<>();
//TODO: Should change keys used here
this.modelMap.put(ConnectionStatusDescriptor.QUEUED_COUNT.getField(), countModelFunction);
this.modelMap.put(ConnectionStatusDescriptor.QUEUED_BYTES.getField(), byteModelFunction);
- this.queryWindow = new QueryWindow(System.currentTimeMillis() - (5 * 60 * 1000), System.currentTimeMillis());
}
refresh();
@@ -70,17 +73,20 @@ public class ConnectionStatusAnalytics implements StatusAnalytics {
public void refresh() {
+
+ LOG.debug("Refreshing model with new data for connection id: {} ", connectionIdentifier);
+
+ this.queryWindow = new QueryWindow(System.currentTimeMillis() - getIntervalTimeMillis(), System.currentTimeMillis());
modelMap.forEach((metric, modelFunction) -> {
StatusAnalyticsModel model = modelFunction.getKey();
ExtractFunction extract = modelFunction.getValue();
StatusHistory statusHistory = componentStatusRepository.getConnectionStatusHistory(connectionIdentifier, queryWindow.getStartDateTime(), queryWindow.getEndDateTime(), Integer.MAX_VALUE);
Tuple<Stream<Double>, Stream<Double>> modelData = extract.extractMetric(metric, statusHistory);
- LOG.info("Refreshing model for connection id: {} ", connectionIdentifier);
Stream<Double> times = modelData.getKey();
- Stream<Double> counts = modelData.getValue();
- //times is the X axis and counts is on the y axis
- model.learn(times, counts);
+ Stream<Double> values = modelData.getValue();
+ //times are the X axis and values are on the y axis
+ model.learn(times, values);
});
}
@@ -90,7 +96,7 @@ public class ConnectionStatusAnalytics implements StatusAnalytics {
*
* @return milliseconds until backpressure is predicted to occur, based on the total number of bytes in the queue.
*/
- public long getTimeToBytesBackpressureMillis() {
+ public Long getTimeToBytesBackpressureMillis() {
final BivariateStatusAnalyticsModel bytesModel = (BivariateStatusAnalyticsModel) modelMap.get(ConnectionStatusDescriptor.QUEUED_BYTES.getField()).getKey();
final Connection connection = getConnection();
@@ -100,10 +106,10 @@ public class ConnectionStatusAnalytics implements StatusAnalytics {
final String backPressureDataSize = connection.getFlowFileQueue().getBackPressureDataSizeThreshold();
final double backPressureBytes = DataUnit.parseDataSize(backPressureDataSize, DataUnit.B);
final double prediction = bytesModel.predictX(backPressureBytes);
- if (prediction != Double.NaN) {
+ if (prediction != Double.NaN && prediction >= System.currentTimeMillis() && !Double.isInfinite(prediction)) {
return Math.max(0, Math.round(prediction) - System.currentTimeMillis());
} else {
- return Long.MAX_VALUE;
+ return -1L;
}
}
@@ -113,7 +119,7 @@ public class ConnectionStatusAnalytics implements StatusAnalytics {
*
* @return milliseconds until backpressure is predicted to occur, based on the number of objects in the queue.
*/
- public long getTimeToCountBackpressureMillis() {
+ public Long getTimeToCountBackpressureMillis() {
final BivariateStatusAnalyticsModel countModel = (BivariateStatusAnalyticsModel) modelMap.get(ConnectionStatusDescriptor.QUEUED_COUNT.getField()).getKey();
final Connection connection = getConnection();
@@ -123,10 +129,10 @@ public class ConnectionStatusAnalytics implements StatusAnalytics {
final double backPressureCountThreshold = connection.getFlowFileQueue().getBackPressureObjectThreshold();
final Double prediction = countModel.predictX(backPressureCountThreshold);
- if (prediction != Double.NaN) {
+ if (prediction != Double.NaN && prediction >= System.currentTimeMillis() && !Double.isInfinite(prediction)) {
return Math.max(0, Math.round(prediction) - System.currentTimeMillis());
} else {
- return Long.MAX_VALUE;
+ return -1L;
}
}
@@ -136,13 +142,13 @@ public class ConnectionStatusAnalytics implements StatusAnalytics {
* @return milliseconds until backpressure is predicted to occur, based on the total number of bytes in the queue.
*/
- public long getNextIntervalBytes() {
+ public Long getNextIntervalBytes() {
final BivariateStatusAnalyticsModel bytesModel = (BivariateStatusAnalyticsModel) modelMap.get(ConnectionStatusDescriptor.QUEUED_BYTES.getField()).getKey();
final Double prediction = bytesModel.predictY((double) System.currentTimeMillis() + getIntervalTimeMillis());
- if (prediction != Double.NaN) {
+ if (prediction != Double.NaN && prediction >= 0) {
return Math.round(prediction);
} else {
- return 0;
+ return -1L;
}
}
@@ -152,29 +158,34 @@ public class ConnectionStatusAnalytics implements StatusAnalytics {
* @return milliseconds until backpressure is predicted to occur, based on the number of bytes in the queue.
*/
- public int getNextIntervalCount() {
+ public Long getNextIntervalCount() {
final BivariateStatusAnalyticsModel countModel = (BivariateStatusAnalyticsModel) modelMap.get(ConnectionStatusDescriptor.QUEUED_COUNT.getField()).getKey();
final Double prediction = countModel.predictY((double) System.currentTimeMillis() + getIntervalTimeMillis());
- if (prediction != Double.NaN) {
- return ((Long) Math.round(prediction)).intValue();
+ if (prediction != Double.NaN && prediction >= 0) {
+ return Math.round(prediction);
} else {
- return 0;
+ return -1L;
}
}
- public int getNextIntervalPercentageUseCount(){
+ public Long getNextIntervalPercentageUseCount(){
final Connection connection = getConnection();
if (connection == null) {
throw new NoSuchElementException("Connection with the following id cannot be found:" + connectionIdentifier + ". Model should be invalidated!");
}
final double backPressureCountThreshold = connection.getFlowFileQueue().getBackPressureObjectThreshold();
+ final long nextIntervalCount = getNextIntervalCount();
- return ((Long)Math.round((getNextIntervalCount()/backPressureCountThreshold) * 100)).intValue();
+ if(nextIntervalCount > -1L) {
+ return Math.round((getNextIntervalCount() / backPressureCountThreshold) * 100);
+ }else{
+ return -1L;
+ }
}
- public int getNextIntervalPercentageUseBytes(){
+ public Long getNextIntervalPercentageUseBytes(){
final Connection connection = getConnection();
if (connection == null) {
@@ -182,13 +193,17 @@ public class ConnectionStatusAnalytics implements StatusAnalytics {
}
final String backPressureDataSize = connection.getFlowFileQueue().getBackPressureDataSizeThreshold();
final double backPressureBytes = DataUnit.parseDataSize(backPressureDataSize, DataUnit.B);
+ final long nextIntervalBytes = getNextIntervalBytes();
- return ((Long)Math.round((getNextIntervalBytes()/ backPressureBytes) * 100)).intValue();
-
+ if(nextIntervalBytes > -1L) {
+ return Math.round((getNextIntervalBytes() / backPressureBytes) * 100);
+ }else{
+ return -1L;
+ }
}
- public long getIntervalTimeMillis(){
- return getQueryWindow().getTimeDifferenceMillis();
+ public Long getIntervalTimeMillis(){
+ return (5L * 60 * 1000);
}
@Override
@@ -206,13 +221,13 @@ public class ConnectionStatusAnalytics implements StatusAnalytics {
predictions.put("timeToBytesBackpressureMillis", getTimeToBytesBackpressureMillis());
predictions.put("timeToCountBackpressureMillis", getTimeToCountBackpressureMillis());
predictions.put("nextIntervalBytes", getNextIntervalBytes());
- predictions.put("nextIntervalCount", (long) getNextIntervalCount());
- predictions.put("nextIntervalPercentageUseCount", (long)getNextIntervalPercentageUseCount());
- predictions.put("nextIntervalPercentageUseBytes", (long)getNextIntervalPercentageUseBytes());
+ predictions.put("nextIntervalCount", getNextIntervalCount());
+ predictions.put("nextIntervalPercentageUseCount", getNextIntervalPercentageUseCount());
+ predictions.put("nextIntervalPercentageUseBytes", getNextIntervalPercentageUseBytes());
predictions.put("intervalTimeMillis", getIntervalTimeMillis());
predictions.forEach((key,value) -> {
- LOG.info("Prediction model for connection id {}: {}={} ", connectionIdentifier,key,value);
+ LOG.debug("Prediction model for connection id {}: {}={} ", connectionIdentifier,key,value);
});
return predictions;
@@ -220,11 +235,11 @@ public class ConnectionStatusAnalytics implements StatusAnalytics {
@Override
public boolean supportsOnlineLearning() {
- return true;
+ return supportOnlineLearning;
}
private Connection getConnection() {
- final ProcessGroup rootGroup = flowController.getFlowManager().getRootGroup();
+ final ProcessGroup rootGroup = flowManager.getRootGroup();
Optional<Connection> connection = rootGroup.findAllConnections().stream().filter(c -> c.getIdentifier().equals(this.connectionIdentifier)).findFirst();
return connection.orElse(null);
}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/ConnectionStatusAnalyticsEngine.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/ConnectionStatusAnalyticsEngine.java
index 7f9db25..8f1222c 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/ConnectionStatusAnalyticsEngine.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/ConnectionStatusAnalyticsEngine.java
@@ -16,28 +16,27 @@
*/
package org.apache.nifi.controller.status.analytics;
-import org.apache.nifi.controller.FlowController;
+import org.apache.nifi.controller.flow.FlowManager;
import org.apache.nifi.controller.status.history.ComponentStatusRepository;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ConnectionStatusAnalyticsEngine implements StatusAnalyticsEngine {
private ComponentStatusRepository statusRepository;
- private FlowController controller;
+ private FlowManager flowManager;
private static final Logger LOG = LoggerFactory.getLogger(ConnectionStatusAnalyticsEngine.class);
- public ConnectionStatusAnalyticsEngine(FlowController controller, ComponentStatusRepository statusRepository) {
- this.controller = controller;
+ public ConnectionStatusAnalyticsEngine(FlowManager flowManager, ComponentStatusRepository statusRepository) {
+ this.flowManager = flowManager;
this.statusRepository = statusRepository;
}
@Override
public StatusAnalytics getStatusAnalytics(String identifier) {
- ConnectionStatusAnalytics connectionStatusAnalytics = new ConnectionStatusAnalytics(statusRepository,controller,identifier);
+ ConnectionStatusAnalytics connectionStatusAnalytics = new ConnectionStatusAnalytics(statusRepository,flowManager,identifier,false);
connectionStatusAnalytics.init();
return connectionStatusAnalytics;
}
-
}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/SimpleRegressionBSAM.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/SimpleRegressionBSAM.java
index 8aa4a45..21c0370 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/SimpleRegressionBSAM.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/SimpleRegressionBSAM.java
@@ -21,21 +21,33 @@ import java.util.stream.Stream;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.math3.stat.regression.SimpleRegression;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class SimpleRegressionBSAM extends BivariateStatusAnalyticsModel {
- private SimpleRegression regression;
+ private static final Logger LOG = LoggerFactory.getLogger(SimpleRegressionBSAM.class);
+ private final SimpleRegression regression;
+ private final Boolean clearObservationsOnLearn;
+
+ public SimpleRegressionBSAM(Boolean clearObservationsOnLearn) {
- public SimpleRegressionBSAM() {
this.regression = new SimpleRegression();
+ this.clearObservationsOnLearn = clearObservationsOnLearn;
}
@Override
public void learn(Stream<Double> features, Stream<Double> labels) {
double[] labelArray = ArrayUtils.toPrimitive(labels.toArray(Double[]::new));
double[][] featuresMatrix = features.map(feature -> new double[]{feature}).toArray(double[][]::new);
- regression.clear();
+
+ if(clearObservationsOnLearn) {
+ regression.clear();
+ }
+
regression.addObservations(featuresMatrix, labelArray);
+ LOG.debug("Model is using equation: y = {}x + {}", regression.getSlope(), regression.getIntercept());
+
}
@Override