You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ai...@apache.org on 2019/08/19 13:53:54 UTC
[nifi] 18/21: NIFI-6150 Fixed NaN check and refactored time
prediction. Switched to use non caching engine for testing
This is an automated email from the ASF dual-hosted git repository.
aichrist pushed a commit to branch analytics-framework
in repository https://gitbox.apache.org/repos/asf/nifi.git
commit 9cc3404bab7fe8ccf1c0a63fb0240db87466de58
Author: Yolanda Davis <yo...@gmail.com>
AuthorDate: Fri Aug 2 08:05:24 2019 -0400
NIFI-6150 Fixed NaN check and refactored time prediction. Switched to use non caching engine for testing
---
.../org/apache/nifi/controller/FlowController.java | 4 +-
.../analytics/ConnectionStatusAnalytics.java | 57 ++++++++++++----------
2 files changed, 33 insertions(+), 28 deletions(-)
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index 141a6a4..33ad7f7 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -150,7 +150,7 @@ import org.apache.nifi.controller.service.StandardConfigurationContext;
import org.apache.nifi.controller.service.StandardControllerServiceProvider;
import org.apache.nifi.controller.state.manager.StandardStateManagerProvider;
import org.apache.nifi.controller.state.server.ZooKeeperStateServer;
-import org.apache.nifi.controller.status.analytics.CachingConnectionStatusAnalyticsEngine;
+import org.apache.nifi.controller.status.analytics.ConnectionStatusAnalyticsEngine;
import org.apache.nifi.controller.status.analytics.StatusAnalyticsEngine;
import org.apache.nifi.controller.status.history.ComponentStatusRepository;
import org.apache.nifi.controller.status.history.GarbageCollectionHistory;
@@ -594,7 +594,7 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node
}
componentStatusRepository = createComponentStatusRepository();
- analyticsEngine = new CachingConnectionStatusAnalyticsEngine(flowManager, componentStatusRepository);
+ analyticsEngine = new ConnectionStatusAnalyticsEngine(flowManager, componentStatusRepository);
eventAccess = new StandardEventAccess(this, flowFileEventRepository);
timerDrivenEngineRef.get().scheduleWithFixedDelay(new Runnable() {
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/ConnectionStatusAnalytics.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/ConnectionStatusAnalytics.java
index 6b831fa..0cb6b5d 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/ConnectionStatusAnalytics.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/ConnectionStatusAnalytics.java
@@ -73,10 +73,12 @@ public class ConnectionStatusAnalytics implements StatusAnalytics {
public void refresh() {
-
LOG.debug("Refreshing model with new data for connection id: {} ", connectionIdentifier);
-
- this.queryWindow = new QueryWindow(System.currentTimeMillis() - getIntervalTimeMillis(), System.currentTimeMillis());
+ if (this.queryWindow == null) {
+ this.queryWindow = new QueryWindow(System.currentTimeMillis() - getIntervalTimeMillis(), System.currentTimeMillis());
+ } else if (supportsOnlineLearning()) {
+ this.queryWindow = new QueryWindow(queryWindow.getEndTimeMillis(), System.currentTimeMillis());
+ }
modelMap.forEach((metric, modelFunction) -> {
StatusAnalyticsModel model = modelFunction.getKey();
@@ -91,6 +93,18 @@ public class ConnectionStatusAnalytics implements StatusAnalytics {
});
}
+ protected Long getTimePrediction(Double prediction, Long timeMillis) {
+
+ if (Double.isNaN(prediction) || Double.isInfinite(prediction)) {
+ return -1L;
+ } else if (prediction < timeMillis) {
+ return 0L;
+ } else {
+ return Math.max(0, Math.round(prediction) - timeMillis);
+ }
+
+ }
+
/**
* Returns the predicted time (in milliseconds) when backpressure is expected to be applied to this connection, based on the total number of bytes in the queue.
*
@@ -106,11 +120,7 @@ public class ConnectionStatusAnalytics implements StatusAnalytics {
final String backPressureDataSize = connection.getFlowFileQueue().getBackPressureDataSizeThreshold();
final double backPressureBytes = DataUnit.parseDataSize(backPressureDataSize, DataUnit.B);
final double prediction = bytesModel.predictX(backPressureBytes);
- if (prediction != Double.NaN && prediction >= System.currentTimeMillis() && !Double.isInfinite(prediction)) {
- return Math.max(0, Math.round(prediction) - System.currentTimeMillis());
- } else {
- return -1L;
- }
+ return getTimePrediction(prediction, System.currentTimeMillis());
}
@@ -127,13 +137,8 @@ public class ConnectionStatusAnalytics implements StatusAnalytics {
throw new NoSuchElementException("Connection with the following id cannot be found:" + connectionIdentifier + ". Model should be invalidated!");
}
final double backPressureCountThreshold = connection.getFlowFileQueue().getBackPressureObjectThreshold();
- final Double prediction = countModel.predictX(backPressureCountThreshold);
-
- if (prediction != Double.NaN && prediction >= System.currentTimeMillis() && !Double.isInfinite(prediction)) {
- return Math.max(0, Math.round(prediction) - System.currentTimeMillis());
- } else {
- return -1L;
- }
+ final double prediction = countModel.predictX(backPressureCountThreshold);
+ return getTimePrediction(prediction, System.currentTimeMillis());
}
/**
@@ -145,7 +150,7 @@ public class ConnectionStatusAnalytics implements StatusAnalytics {
public Long getNextIntervalBytes() {
final BivariateStatusAnalyticsModel bytesModel = (BivariateStatusAnalyticsModel) modelMap.get(ConnectionStatusDescriptor.QUEUED_BYTES.getField()).getKey();
final Double prediction = bytesModel.predictY((double) System.currentTimeMillis() + getIntervalTimeMillis());
- if (prediction != Double.NaN && prediction >= 0) {
+ if (!Double.isNaN(prediction) && prediction >= 0) {
return Math.round(prediction);
} else {
return -1L;
@@ -161,14 +166,14 @@ public class ConnectionStatusAnalytics implements StatusAnalytics {
public Long getNextIntervalCount() {
final BivariateStatusAnalyticsModel countModel = (BivariateStatusAnalyticsModel) modelMap.get(ConnectionStatusDescriptor.QUEUED_COUNT.getField()).getKey();
final Double prediction = countModel.predictY((double) System.currentTimeMillis() + getIntervalTimeMillis());
- if (prediction != Double.NaN && prediction >= 0) {
+ if (!Double.isNaN(prediction) && prediction >= 0) {
return Math.round(prediction);
} else {
return -1L;
}
}
- public Long getNextIntervalPercentageUseCount(){
+ public Long getNextIntervalPercentageUseCount() {
final Connection connection = getConnection();
if (connection == null) {
@@ -177,15 +182,15 @@ public class ConnectionStatusAnalytics implements StatusAnalytics {
final double backPressureCountThreshold = connection.getFlowFileQueue().getBackPressureObjectThreshold();
final long nextIntervalCount = getNextIntervalCount();
- if(nextIntervalCount > -1L) {
+ if (nextIntervalCount > -1L) {
return Math.round((getNextIntervalCount() / backPressureCountThreshold) * 100);
- }else{
+ } else {
return -1L;
}
}
- public Long getNextIntervalPercentageUseBytes(){
+ public Long getNextIntervalPercentageUseBytes() {
final Connection connection = getConnection();
if (connection == null) {
@@ -195,14 +200,14 @@ public class ConnectionStatusAnalytics implements StatusAnalytics {
final double backPressureBytes = DataUnit.parseDataSize(backPressureDataSize, DataUnit.B);
final long nextIntervalBytes = getNextIntervalBytes();
- if(nextIntervalBytes > -1L) {
+ if (nextIntervalBytes > -1L) {
return Math.round((getNextIntervalBytes() / backPressureBytes) * 100);
- }else{
+ } else {
return -1L;
}
}
- public Long getIntervalTimeMillis(){
+ public Long getIntervalTimeMillis() {
return (5L * 60 * 1000);
}
@@ -226,8 +231,8 @@ public class ConnectionStatusAnalytics implements StatusAnalytics {
predictions.put("nextIntervalPercentageUseBytes", getNextIntervalPercentageUseBytes());
predictions.put("intervalTimeMillis", getIntervalTimeMillis());
- predictions.forEach((key,value) -> {
- LOG.debug("Prediction model for connection id {}: {}={} ", connectionIdentifier,key,value);
+ predictions.forEach((key, value) -> {
+ LOG.debug("Prediction model for connection id {}: {}={} ", connectionIdentifier, key, value);
});
return predictions;