You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ai...@apache.org on 2019/08/28 20:21:05 UTC
[nifi] 12/24: NIFI-6510 Analytics Framework Introduction (#10)
This is an automated email from the ASF dual-hosted git repository.
aichrist pushed a commit to branch analytics-framework
in repository https://gitbox.apache.org/repos/asf/nifi.git
commit 7bc646751b3065e17a9f04fc572ec8d5a5cbea42
Author: Yolanda Davis <yo...@gmail.com>
AuthorDate: Mon Jul 29 07:16:21 2019 -0400
NIFI-6510 Analytics Framework Introduction (#10)
* DFA-9 - Initial refactor for Status Analytics - created additional interfaces for models, refactored callers to use StatusAnalytics objects with connection context. Implemented SimpleRegression model.
DFA-9 - added logging
* DFA-9 - relocated query window to CSA from model, adding the prediction percentages and time interval
* DFA-9 - checkstyle fixes
---
.../nifi/controller/status/ConnectionStatus.java | 19 ++
.../controller/status/analytics/QueryWindow.java | 59 +++++
.../status/analytics/StatusAnalytics.java | 26 +--
.../status/analytics/StatusAnalyticsEngine.java | 3 +-
...tusAnalytics.java => StatusAnalyticsModel.java} | 17 +-
.../status/ConnectionStatisticsSnapshotDTO.java | 37 ++-
.../analytics/BivariateStatusAnalyticsModel.java | 30 ++-
.../CachingConnectionStatusAnalyticsEngine.java | 147 ++----------
.../analytics/ConnectionStatusAnalytics.java | 254 +++++++++++++++++++++
.../analytics/ConnectionStatusAnalyticsEngine.java | 121 +---------
.../status/analytics/SimpleRegressionBSAM.java | 55 +++++
.../apache/nifi/reporting/StandardEventAccess.java | 15 +-
.../apache/nifi/web/StandardNiFiServiceFacade.java | 60 ++---
.../org/apache/nifi/web/api/dto/DtoFactory.java | 98 ++++----
.../nifi/web/controller/ControllerFacade.java | 59 ++---
15 files changed, 595 insertions(+), 405 deletions(-)
diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/status/ConnectionStatus.java b/nifi-api/src/main/java/org/apache/nifi/controller/status/ConnectionStatus.java
index ee7dd45..783677a 100644
--- a/nifi-api/src/main/java/org/apache/nifi/controller/status/ConnectionStatus.java
+++ b/nifi-api/src/main/java/org/apache/nifi/controller/status/ConnectionStatus.java
@@ -45,6 +45,8 @@ public class ConnectionStatus implements Cloneable {
private long nextPredictedQueuedBytes;
private long predictedTimeToCountBackpressureMillis;
private long predictedTimeToBytesBackpressureMillis;
+ private int predictedPercentCount = 0;
+ private int predictedPercentBytes = 0;
public String getId() {
return id;
@@ -231,6 +233,23 @@ public class ConnectionStatus implements Cloneable {
this.predictedTimeToBytesBackpressureMillis = predictedTimeToBytesBackpressureMillis;
}
+ public int getPredictedPercentCount() {
+ return predictedPercentCount;
+ }
+
+ public void setPredictedPercentCount(int predictedPercentCount) {
+ this.predictedPercentCount = predictedPercentCount;
+ }
+
+ public int getPredictedPercentBytes() {
+ return predictedPercentBytes;
+ }
+
+ public void setPredictedPercentBytes(int predictedPercentBytes) {
+ this.predictedPercentBytes = predictedPercentBytes;
+ }
+
+
@Override
public ConnectionStatus clone() {
final ConnectionStatus clonedObj = new ConnectionStatus();
diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/status/analytics/QueryWindow.java b/nifi-api/src/main/java/org/apache/nifi/controller/status/analytics/QueryWindow.java
new file mode 100644
index 0000000..c477872
--- /dev/null
+++ b/nifi-api/src/main/java/org/apache/nifi/controller/status/analytics/QueryWindow.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.status.analytics;
+
+import java.util.Date;
+
+public class QueryWindow {
+
+ private long startTimeMillis;
+ private long endTimeMillis;
+
+ public QueryWindow(long startTimeMillis, long endTimeMillis) {
+ this.startTimeMillis = startTimeMillis;
+ this.endTimeMillis = endTimeMillis;
+ }
+
+ public long getStartTimeMillis() {
+ return startTimeMillis;
+ }
+
+ public void setStartTimeMillis(long startTimeMillis) {
+ this.startTimeMillis = startTimeMillis;
+ }
+
+ public long getEndTimeMillis() {
+ return endTimeMillis;
+ }
+
+ public void setEndTimeMillis(long endTimeMillis) {
+ this.endTimeMillis = endTimeMillis;
+ }
+
+ public Date getStartDateTime() {
+ return new Date(startTimeMillis);
+ }
+
+ public Date getEndDateTime() {
+ return new Date(endTimeMillis);
+ }
+
+ public long getTimeDifferenceMillis(){
+ return endTimeMillis - startTimeMillis;
+ }
+
+}
diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalytics.java b/nifi-api/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalytics.java
index 564f1c9..a65629f 100644
--- a/nifi-api/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalytics.java
+++ b/nifi-api/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalytics.java
@@ -16,32 +16,16 @@
*/
package org.apache.nifi.controller.status.analytics;
+import java.util.Map;
+
/**
* The StatusAnalytics interface offers methods for accessing predicted and other values for a single component (Connection instance, e.g.)
*/
public interface StatusAnalytics {
- /**
- * Returns the predicted time (in milliseconds) when backpressure is expected to be applied to this connection, based on the total number of bytes in the queue.
- * @return milliseconds until backpressure is predicted to occur, based on the total number of bytes in the queue.
- */
- long getTimeToBytesBackpressureMillis();
-
- /**
- * Returns the predicted time (in milliseconds) when backpressure is expected to be applied to this connection, based on the number of objects in the queue.
- * @return milliseconds until backpressure is predicted to occur, based on the number of objects in the queue.
- */
- long getTimeToCountBackpressureMillis();
+ QueryWindow getQueryWindow();
+ Map<String,Long> getPredictions();
+ boolean supportsOnlineLearning();
- /**
- * Returns the predicted total number of bytes in the queue to occur at the next configured interval (5 mins in the future, e.g.).
- * @return milliseconds until backpressure is predicted to occur, based on the total number of bytes in the queue.
- */
- long getNextIntervalBytes();
- /**
- * Returns the predicted number of objects in the queue to occur at the next configured interval (5 mins in the future, e.g.).
- * @return milliseconds until backpressure is predicted to occur, based on the number of bytes in the queue.
- */
- int getNextIntervalCount();
}
diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalyticsEngine.java b/nifi-api/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalyticsEngine.java
index 5cbc333..01021c2 100644
--- a/nifi-api/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalyticsEngine.java
+++ b/nifi-api/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalyticsEngine.java
@@ -18,5 +18,6 @@ package org.apache.nifi.controller.status.analytics;
public interface StatusAnalyticsEngine {
- ConnectionStatusAnalytics getConnectionStatusAnalytics(String connectionId);
+ StatusAnalytics getStatusAnalytics(String componentId);
+
}
diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/status/analytics/ConnectionStatusAnalytics.java b/nifi-api/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalyticsModel.java
similarity index 66%
copy from nifi-api/src/main/java/org/apache/nifi/controller/status/analytics/ConnectionStatusAnalytics.java
copy to nifi-api/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalyticsModel.java
index 9792ae4..72c81b1 100644
--- a/nifi-api/src/main/java/org/apache/nifi/controller/status/analytics/ConnectionStatusAnalytics.java
+++ b/nifi-api/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalyticsModel.java
@@ -16,17 +16,12 @@
*/
package org.apache.nifi.controller.status.analytics;
+import java.util.stream.Stream;
-/**
- * The ConnectionStatusAnalytics interface offers additional methods to the StatusAnalytics interface related to the supporting connection information (group ID, e.g.)
- */
-public interface ConnectionStatusAnalytics extends StatusAnalytics{
+public interface StatusAnalyticsModel {
+
+ void learn(Stream<Double> features, Stream<Double> labels);
+ Double predict(Double feature);
+ Boolean supportsOnlineLearning();
- String getGroupId();
- String getId();
- String getName();
- String getSourceId();
- String getSourceName();
- String getDestinationId();
- String getDestinationName();
}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/ConnectionStatisticsSnapshotDTO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/ConnectionStatisticsSnapshotDTO.java
index 526bdcf..a521db4 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/ConnectionStatisticsSnapshotDTO.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/ConnectionStatisticsSnapshotDTO.java
@@ -16,10 +16,10 @@
*/
package org.apache.nifi.web.api.dto.status;
-import io.swagger.annotations.ApiModelProperty;
-
import javax.xml.bind.annotation.XmlType;
+import io.swagger.annotations.ApiModelProperty;
+
/**
* DTO for serializing the statistics of a connection.
*/
@@ -39,6 +39,9 @@ public class ConnectionStatisticsSnapshotDTO implements Cloneable {
private Long predictedMillisUntilBytesBackpressure = 0L;
private Integer predictedCountAtNextInterval = 0;
private Long predictedBytesAtNextInterval = 0L;
+ private Integer predictedPercentCount = 0;
+ private Integer predictedPercentBytes = 0;
+ private Long predictionIntervalMillis = 0L;
/* getters / setters */
/**
@@ -161,6 +164,33 @@ public class ConnectionStatisticsSnapshotDTO implements Cloneable {
this.predictedBytesAtNextInterval = predictedBytesAtNextInterval;
}
+ @ApiModelProperty("The predicted percentage of queued objects at the next configured interval.")
+ public Integer getPredictedPercentCount() {
+ return predictedPercentCount;
+ }
+
+ public void setPredictedPercentCount(Integer predictedPercentCount) {
+ this.predictedPercentCount = predictedPercentCount;
+ }
+
+ @ApiModelProperty("The predicted percentage of bytes in the queue against current threshold at the next configured interval.")
+ public Integer getPredictedPercentBytes() {
+ return predictedPercentBytes;
+ }
+
+ public void setPredictedPercentBytes(Integer predictedPercentBytes) {
+ this.predictedPercentBytes = predictedPercentBytes;
+ }
+
+ @ApiModelProperty("The prediction interval in seconds")
+ public Long getPredictionIntervalMillis() {
+ return predictionIntervalMillis;
+ }
+
+ public void setPredictionIntervalMillis(Long predictionIntervalMillis) {
+ this.predictionIntervalMillis = predictionIntervalMillis;
+ }
+
@Override
public ConnectionStatisticsSnapshotDTO clone() {
final ConnectionStatisticsSnapshotDTO other = new ConnectionStatisticsSnapshotDTO();
@@ -176,6 +206,9 @@ public class ConnectionStatisticsSnapshotDTO implements Cloneable {
other.setPredictedMillisUntilBytesBackpressure(getPredictedMillisUntilBytesBackpressure());
other.setPredictedCountAtNextInterval(getPredictedCountAtNextInterval());
other.setPredictedBytesAtNextInterval(getPredictedBytesAtNextInterval());
+ other.setPredictedPercentCount(getPredictedPercentCount());
+ other.setPredictedPercentBytes(getPredictedPercentBytes());
+ other.setPredictionIntervalMillis(getPredictionIntervalMillis());
return other;
}
diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/status/analytics/ConnectionStatusAnalytics.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/BivariateStatusAnalyticsModel.java
similarity index 66%
rename from nifi-api/src/main/java/org/apache/nifi/controller/status/analytics/ConnectionStatusAnalytics.java
rename to nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/BivariateStatusAnalyticsModel.java
index 9792ae4..eff661b 100644
--- a/nifi-api/src/main/java/org/apache/nifi/controller/status/analytics/ConnectionStatusAnalytics.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/BivariateStatusAnalyticsModel.java
@@ -16,17 +16,23 @@
*/
package org.apache.nifi.controller.status.analytics;
+import java.util.stream.Stream;
+
+public abstract class BivariateStatusAnalyticsModel implements StatusAnalyticsModel {
+
+
+ public abstract void learn(Stream<Double> features, Stream<Double> labels);
+
+ public abstract Double predict(Double feature);
+
+ public abstract Double predictX(Double y);
+
+ public abstract Double predictY(Double x);
+
+ @Override
+ public Boolean supportsOnlineLearning() {
+ return false;
+ }
+
-/**
- * The ConnectionStatusAnalytics interface offers additional methods to the StatusAnalytics interface related to the supporting connection information (group ID, e.g.)
- */
-public interface ConnectionStatusAnalytics extends StatusAnalytics{
-
- String getGroupId();
- String getId();
- String getName();
- String getSourceId();
- String getSourceName();
- String getDestinationId();
- String getDestinationName();
}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/CachingConnectionStatusAnalyticsEngine.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/CachingConnectionStatusAnalyticsEngine.java
index c12dbae..c4836c6 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/CachingConnectionStatusAnalyticsEngine.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/CachingConnectionStatusAnalyticsEngine.java
@@ -16,19 +16,10 @@
*/
package org.apache.nifi.controller.status.analytics;
-import java.util.Date;
-import java.util.List;
import java.util.concurrent.TimeUnit;
-import org.apache.commons.math3.stat.regression.SimpleRegression;
-import org.apache.nifi.connectable.Connection;
import org.apache.nifi.controller.FlowController;
import org.apache.nifi.controller.status.history.ComponentStatusRepository;
-import org.apache.nifi.controller.status.history.ConnectionStatusDescriptor;
-import org.apache.nifi.controller.status.history.StatusHistoryUtil;
-import org.apache.nifi.groups.ProcessGroup;
-import org.apache.nifi.web.api.dto.status.StatusHistoryDTO;
-import org.apache.nifi.web.api.dto.status.StatusSnapshotDTO;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -38,141 +29,33 @@ import com.github.benmanes.caffeine.cache.Caffeine;
public class CachingConnectionStatusAnalyticsEngine implements StatusAnalyticsEngine {
private ComponentStatusRepository statusRepository;
private FlowController controller;
- private volatile Cache<String, SimpleRegression> cache;
- private static final Logger LOG = LoggerFactory.getLogger(ConnectionStatusAnalyticsEngine.class);
+ private volatile Cache<String, ConnectionStatusAnalytics> cache;
+ private static final Logger LOG = LoggerFactory.getLogger(CachingConnectionStatusAnalyticsEngine.class);
public CachingConnectionStatusAnalyticsEngine(FlowController controller, ComponentStatusRepository statusRepository) {
this.controller = controller;
this.statusRepository = statusRepository;
this.cache = Caffeine.newBuilder()
- .expireAfterWrite(1, TimeUnit.MINUTES)
+ .expireAfterWrite(5, TimeUnit.MINUTES)
.build();
}
@Override
- public ConnectionStatusAnalytics getConnectionStatusAnalytics(String connectionId) {
-
- ProcessGroup rootGroup = controller.getFlowManager().getRootGroup();
- Connection connection = rootGroup.findConnection(connectionId);
- SimpleRegression cachedRegression = cache.getIfPresent(connection.getIdentifier());
-
- if (cachedRegression == null) {
- cachedRegression = getBackPressureRegressionModel(connection);
- if (cachedRegression != null)
- cache.put(connection.getIdentifier(), cachedRegression);
+ public StatusAnalytics getStatusAnalytics(String identifier) {
+
+ ConnectionStatusAnalytics connectionStatusAnalytics = cache.getIfPresent(identifier);
+ if(connectionStatusAnalytics == null){
+ LOG.info("Creating new analytics for connection id: {0}", identifier);
+ connectionStatusAnalytics = new ConnectionStatusAnalytics(statusRepository,controller,identifier);
+ connectionStatusAnalytics.init();
+ cache.put(identifier,connectionStatusAnalytics);
+ }else{
+ LOG.info("Pulled existing analytics from cache for connection id: {}", identifier);
+ connectionStatusAnalytics.refresh();
}
-
- ConnectionStatusAnalytics cachedResult = calculate(cachedRegression, connection);
- LOG.info("Connection: " + connectionId + " Cached backpressure Time: " + cachedResult.getTimeToCountBackpressureMillis());
- return cachedResult;
- }
-
- protected ConnectionStatusAnalytics calculate(SimpleRegression regression, Connection conn) {
- long backPressureObjectThreshold = conn.getFlowFileQueue().getBackPressureObjectThreshold();
-
- final long connTimeToBackpressure;
-
- if (regression == null) {
- connTimeToBackpressure = Long.MAX_VALUE;
- } else {
- //If calculation returns as negative only 0 will return
- connTimeToBackpressure = Math.max(0, Math.round((backPressureObjectThreshold - regression.getIntercept()) / regression.getSlope())
- - System.currentTimeMillis());
- }
-
- return new ConnectionStatusAnalytics() {
-
- @Override
- public String getSourceName() {
- return conn.getSource().getName();
- }
-
- @Override
- public String getSourceId() {
- return conn.getSource().getIdentifier();
- }
-
- @Override
- public String getName() {
- return conn.getName();
- }
-
-
- @Override
- public String getId() {
- return conn.getIdentifier();
- }
-
- @Override
- public long getTimeToBytesBackpressureMillis() {
- return 0;
- }
-
- @Override
- public long getTimeToCountBackpressureMillis() {
- return connTimeToBackpressure;
- }
-
- @Override
- public long getNextIntervalBytes() {
- return 0;
- }
-
- @Override
- public int getNextIntervalCount() {
- return 0;
- }
-
- @Override
- public String getGroupId() {
- return conn.getProcessGroup().getIdentifier();
- }
-
- @Override
- public String getDestinationName() {
- return conn.getDestination().getName();
- }
-
- @Override
- public String getDestinationId() {
- return conn.getDestination().getIdentifier();
- }
- };
+ return connectionStatusAnalytics;
}
- /**
- * Get backpressure model based on current data
- *
- * @param conn the connection to run the analytic on
- * @return
- */
- protected SimpleRegression getBackPressureRegressionModel(Connection conn) {
- Date minDate = new Date(System.currentTimeMillis() - (60 * 1000));
- StatusHistoryDTO connHistory = StatusHistoryUtil.createStatusHistoryDTO(
- statusRepository.getConnectionStatusHistory(conn.getIdentifier(), minDate, null, Integer.MAX_VALUE));
- List<StatusSnapshotDTO> aggregateSnapshots = connHistory.getAggregateSnapshots();
-
- if (aggregateSnapshots.size() < 2) {
- LOG.info("Not enough data to model time to backpressure.");
- return null;
- } else {
-
- ConnectionStatusDescriptor.QUEUED_COUNT.getField();
- SimpleRegression regression = new SimpleRegression();
- for (StatusSnapshotDTO snap : aggregateSnapshots) {
- Long snapQueuedCount = snap.getStatusMetrics().get(ConnectionStatusDescriptor.QUEUED_COUNT.getField());
- long snapTime = snap.getTimestamp().getTime();
- regression.addData(snapTime, snapQueuedCount);
- }
-
- if (regression.getSlope() <= 0 && !conn.getFlowFileQueue().isFull()) {
- LOG.info("Connection " + conn.getIdentifier() + " is not experiencing backpressure.");
- return null;
- } else {
- return regression;
- }
- }
- }
}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/ConnectionStatusAnalytics.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/ConnectionStatusAnalytics.java
new file mode 100644
index 0000000..bad2ff1
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/ConnectionStatusAnalytics.java
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.status.analytics;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Optional;
+import java.util.stream.Stream;
+
+import org.apache.nifi.connectable.Connection;
+import org.apache.nifi.controller.FlowController;
+import org.apache.nifi.controller.status.history.ComponentStatusRepository;
+import org.apache.nifi.controller.status.history.ConnectionStatusDescriptor;
+import org.apache.nifi.controller.status.history.StatusHistory;
+import org.apache.nifi.controller.status.history.StatusHistoryUtil;
+import org.apache.nifi.groups.ProcessGroup;
+import org.apache.nifi.processor.DataUnit;
+import org.apache.nifi.util.Tuple;
+import org.apache.nifi.web.api.dto.status.StatusHistoryDTO;
+import org.apache.nifi.web.api.dto.status.StatusSnapshotDTO;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ConnectionStatusAnalytics implements StatusAnalytics {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ConnectionStatusAnalytics.class);
+ private Map<String, Tuple<StatusAnalyticsModel, ExtractFunction>> modelMap;
+ private QueryWindow queryWindow;
+ private final ComponentStatusRepository componentStatusRepository;
+ private final String connectionIdentifier;
+ private final FlowController flowController;
+
+ public ConnectionStatusAnalytics(ComponentStatusRepository componentStatusRepository, FlowController flowController, String connectionIdentifier) {
+ this.componentStatusRepository = componentStatusRepository;
+ this.flowController = flowController;
+ this.connectionIdentifier = connectionIdentifier;
+ }
+
+ public void init() {
+
+ if (this.modelMap == null || this.modelMap.isEmpty()) {
+ Tuple<StatusAnalyticsModel, ExtractFunction> countModelFunction = new Tuple<>(new SimpleRegressionBSAM(), extract);
+ Tuple<StatusAnalyticsModel, ExtractFunction> byteModelFunction = new Tuple<>(new SimpleRegressionBSAM(), extract);
+ this.modelMap = new HashMap<>();
+ //TODO: Should change keys used here
+ this.modelMap.put(ConnectionStatusDescriptor.QUEUED_COUNT.getField(), countModelFunction);
+ this.modelMap.put(ConnectionStatusDescriptor.QUEUED_BYTES.getField(), byteModelFunction);
+ this.queryWindow = new QueryWindow(System.currentTimeMillis() - (5 * 60 * 1000), System.currentTimeMillis());
+ }
+
+ refresh();
+ }
+
+ public void refresh() {
+
+ modelMap.forEach((metric, modelFunction) -> {
+
+ StatusAnalyticsModel model = modelFunction.getKey();
+ ExtractFunction extract = modelFunction.getValue();
+ StatusHistory statusHistory = componentStatusRepository.getConnectionStatusHistory(connectionIdentifier, queryWindow.getStartDateTime(), queryWindow.getEndDateTime(), Integer.MAX_VALUE);
+ Tuple<Stream<Double>, Stream<Double>> modelData = extract.extractMetric(metric, statusHistory);
+ LOG.info("Refreshing model for connection id: {} ", connectionIdentifier);
+ Stream<Double> times = modelData.getKey();
+ Stream<Double> counts = modelData.getValue();
+ //times is the X axis and counts is on the y axis
+ model.learn(times, counts);
+
+ });
+ }
+
+ /**
+ * Returns the predicted time (in milliseconds) when backpressure is expected to be applied to this connection, based on the total number of bytes in the queue.
+ *
+ * @return milliseconds until backpressure is predicted to occur, based on the total number of bytes in the queue.
+ */
+ public long getTimeToBytesBackpressureMillis() {
+
+ final BivariateStatusAnalyticsModel bytesModel = (BivariateStatusAnalyticsModel) modelMap.get(ConnectionStatusDescriptor.QUEUED_BYTES.getField()).getKey();
+ final Connection connection = getConnection();
+ if (connection == null) {
+ throw new NoSuchElementException("Connection with the following id cannot be found:" + connectionIdentifier + ". Model should be invalidated!");
+ }
+ final String backPressureDataSize = connection.getFlowFileQueue().getBackPressureDataSizeThreshold();
+ final double backPressureBytes = DataUnit.parseDataSize(backPressureDataSize, DataUnit.B);
+ final double prediction = bytesModel.predictX(backPressureBytes);
+ if (prediction != Double.NaN) {
+ return Math.max(0, Math.round(prediction) - System.currentTimeMillis());
+ } else {
+ return Long.MAX_VALUE;
+ }
+
+ }
+
+ /**
+ * Returns the predicted time (in milliseconds) when backpressure is expected to be applied to this connection, based on the number of objects in the queue.
+ *
+ * @return milliseconds until backpressure is predicted to occur, based on the number of objects in the queue.
+ */
+ public long getTimeToCountBackpressureMillis() {
+
+ final BivariateStatusAnalyticsModel countModel = (BivariateStatusAnalyticsModel) modelMap.get(ConnectionStatusDescriptor.QUEUED_COUNT.getField()).getKey();
+ final Connection connection = getConnection();
+ if (connection == null) {
+ throw new NoSuchElementException("Connection with the following id cannot be found:" + connectionIdentifier + ". Model should be invalidated!");
+ }
+ final double backPressureCountThreshold = connection.getFlowFileQueue().getBackPressureObjectThreshold();
+ final Double prediction = countModel.predictX(backPressureCountThreshold);
+
+ if (prediction != Double.NaN) {
+ return Math.max(0, Math.round(prediction) - System.currentTimeMillis());
+ } else {
+ return Long.MAX_VALUE;
+ }
+ }
+
+ /**
+ * Returns the predicted total number of bytes in the queue to occur at the next configured interval (5 mins in the future, e.g.).
+ *
+ * @return milliseconds until backpressure is predicted to occur, based on the total number of bytes in the queue.
+ */
+
+ public long getNextIntervalBytes() {
+ final BivariateStatusAnalyticsModel bytesModel = (BivariateStatusAnalyticsModel) modelMap.get(ConnectionStatusDescriptor.QUEUED_BYTES.getField()).getKey();
+ final Double prediction = bytesModel.predictY((double) System.currentTimeMillis() + getIntervalTimeMillis());
+ if (prediction != Double.NaN) {
+ return Math.round(prediction);
+ } else {
+ return 0;
+ }
+ }
+
+ /**
+ * Returns the predicted number of objects in the queue to occur at the next configured interval (5 mins in the future, e.g.).
+ *
+ * @return milliseconds until backpressure is predicted to occur, based on the number of bytes in the queue.
+ */
+
+ public int getNextIntervalCount() {
+ final BivariateStatusAnalyticsModel countModel = (BivariateStatusAnalyticsModel) modelMap.get(ConnectionStatusDescriptor.QUEUED_COUNT.getField()).getKey();
+ final Double prediction = countModel.predictY((double) System.currentTimeMillis() + getIntervalTimeMillis());
+ if (prediction != Double.NaN) {
+ return ((Long) Math.round(prediction)).intValue();
+ } else {
+ return 0;
+ }
+ }
+
+ public int getNextIntervalPercentageUseCount(){
+
+ final Connection connection = getConnection();
+ if (connection == null) {
+ throw new NoSuchElementException("Connection with the following id cannot be found:" + connectionIdentifier + ". Model should be invalidated!");
+ }
+ final double backPressureCountThreshold = connection.getFlowFileQueue().getBackPressureObjectThreshold();
+
+ return ((Long)Math.round((getNextIntervalCount()/backPressureCountThreshold) * 100)).intValue();
+
+ }
+
+ public int getNextIntervalPercentageUseBytes(){
+
+ final Connection connection = getConnection();
+ if (connection == null) {
+ throw new NoSuchElementException("Connection with the following id cannot be found:" + connectionIdentifier + ". Model should be invalidated!");
+ }
+ final String backPressureDataSize = connection.getFlowFileQueue().getBackPressureDataSizeThreshold();
+ final double backPressureBytes = DataUnit.parseDataSize(backPressureDataSize, DataUnit.B);
+
+ return ((Long)Math.round((getNextIntervalBytes()/ backPressureBytes) * 100)).intValue();
+
+ }
+
+ public long getIntervalTimeMillis(){
+ return getQueryWindow().getTimeDifferenceMillis();
+ }
+
+ @Override
+ public QueryWindow getQueryWindow() {
+ return queryWindow;
+ }
+
+ /**
+ * Returns all available predictions
+ */
+ @Override
+ public Map<String, Long> getPredictions() {
+
+ Map<String, Long> predictions = new HashMap<>();
+ predictions.put("timeToBytesBackpressureMillis", getTimeToBytesBackpressureMillis());
+ predictions.put("timeToCountBackpressureMillis", getTimeToCountBackpressureMillis());
+ predictions.put("nextIntervalBytes", getNextIntervalBytes());
+ predictions.put("nextIntervalCount", (long) getNextIntervalCount());
+ predictions.put("nextIntervalPercentageUseCount", (long)getNextIntervalPercentageUseCount());
+ predictions.put("nextIntervalPercentageUseBytes", (long)getNextIntervalPercentageUseBytes());
+ predictions.put("intervalTimeMillis", getIntervalTimeMillis());
+
+ predictions.forEach((key,value) -> {
+ LOG.info("Prediction model for connection id {}: {}={} ", connectionIdentifier,key,value);
+ });
+
+ return predictions;
+ }
+
+ @Override
+ public boolean supportsOnlineLearning() {
+ return true;
+ }
+
+ private Connection getConnection() {
+ final ProcessGroup rootGroup = flowController.getFlowManager().getRootGroup();
+ Optional<Connection> connection = rootGroup.findAllConnections().stream().filter(c -> c.getIdentifier().equals(this.connectionIdentifier)).findFirst();
+ return connection.orElse(null);
+ }
+
+ private interface ExtractFunction {
+ Tuple<Stream<Double>, Stream<Double>> extractMetric(String metric, StatusHistory statusHistory);
+ }
+
+ private final ExtractFunction extract = (metric, statusHistory) -> {
+
+ List<Double> counts = new ArrayList<>();
+ List<Double> times = new ArrayList<>();
+
+ StatusHistoryDTO statusHistoryDTO = StatusHistoryUtil.createStatusHistoryDTO(statusHistory);
+
+ for (StatusSnapshotDTO snap : statusHistoryDTO.getAggregateSnapshots()) {
+ Long snapValue = snap.getStatusMetrics().get(metric);
+ long snapTime = snap.getTimestamp().getTime();
+ counts.add((double) snapValue);
+ times.add((double) snapTime);
+ }
+ return new Tuple<>(times.stream(), counts.stream());
+
+ };
+
+
+}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/ConnectionStatusAnalyticsEngine.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/ConnectionStatusAnalyticsEngine.java
index 11862c8..7f9db25 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/ConnectionStatusAnalyticsEngine.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/ConnectionStatusAnalyticsEngine.java
@@ -16,18 +16,8 @@
*/
package org.apache.nifi.controller.status.analytics;
-import java.util.Date;
-import java.util.List;
-
-import org.apache.commons.math3.stat.regression.SimpleRegression;
-import org.apache.nifi.connectable.Connection;
import org.apache.nifi.controller.FlowController;
import org.apache.nifi.controller.status.history.ComponentStatusRepository;
-import org.apache.nifi.controller.status.history.ConnectionStatusDescriptor;
-import org.apache.nifi.controller.status.history.StatusHistoryUtil;
-import org.apache.nifi.groups.ProcessGroup;
-import org.apache.nifi.web.api.dto.status.StatusHistoryDTO;
-import org.apache.nifi.web.api.dto.status.StatusSnapshotDTO;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -43,114 +33,11 @@ public class ConnectionStatusAnalyticsEngine implements StatusAnalyticsEngine {
}
@Override
- public ConnectionStatusAnalytics getConnectionStatusAnalytics(String connectionId) {
- ProcessGroup rootGroup = controller.getFlowManager().getRootGroup();
- return getConnectionStatusAnalytics(rootGroup.findConnection(connectionId));
+ public StatusAnalytics getStatusAnalytics(String identifier) {
+ ConnectionStatusAnalytics connectionStatusAnalytics = new ConnectionStatusAnalytics(statusRepository,controller,identifier);
+ connectionStatusAnalytics.init();
+ return connectionStatusAnalytics;
}
- /**
- * Finds the number of millis until the given connection will experience backpressure.
- * @param conn the connection to run the analytic on
- * @return
- */
- public ConnectionStatusAnalytics getConnectionStatusAnalytics(Connection conn) {
- LOG.debug("Getting connection history for: " + conn.getIdentifier());
- long connTimeToBackpressure;
- Date minDate = new Date(System.currentTimeMillis() - (5 * 60 * 1000));
- StatusHistoryDTO connHistory = StatusHistoryUtil.createStatusHistoryDTO(
- statusRepository.getConnectionStatusHistory(conn.getIdentifier(), minDate, null, Integer.MAX_VALUE));
- List<StatusSnapshotDTO> aggregateSnapshots = connHistory.getAggregateSnapshots();
-
- if (aggregateSnapshots.size() < 2) {
- LOG.info("Not enough data to model time to backpressure.");
- connTimeToBackpressure = Long.MAX_VALUE;
- } else {
-
- long backPressureObjectThreshold = conn.getFlowFileQueue().getBackPressureObjectThreshold();
- LOG.info("Connection " + conn.getIdentifier() + " backpressure object threshold is " + backPressureObjectThreshold);
-
- ConnectionStatusDescriptor.QUEUED_COUNT.getField();
-
- SimpleRegression regression = new SimpleRegression();
-
- for (StatusSnapshotDTO snap : aggregateSnapshots) {
- Long snapQueuedCount = snap.getStatusMetrics().get(ConnectionStatusDescriptor.QUEUED_COUNT.getField());
- long snapTime = snap.getTimestamp().getTime();
- regression.addData(snapTime, snapQueuedCount);
- }
-
- // Skip this connection if its queue is declining.
- if (regression.getSlope() <= 0) {
- LOG.info("Connection " + conn.getIdentifier() + " is not experiencing backpressure.");
- connTimeToBackpressure = Long.MAX_VALUE;
- } else {
-
- // Compute time-to backpressure for this connection; Reduce total result iff
- // this connection is lower.
- connTimeToBackpressure = Math
- .round((backPressureObjectThreshold - regression.getIntercept()) / regression.getSlope())
- - System.currentTimeMillis();
- LOG.info("Connection " + conn.getIdentifier() + " time to backpressure is " + connTimeToBackpressure);
- }
- }
-
- return new ConnectionStatusAnalytics() {
-
- @Override
- public String getSourceName() {
- return conn.getSource().getName();
- }
- @Override
- public String getSourceId() {
- return conn.getSource().getIdentifier();
- }
-
- @Override
- public String getName() {
- return conn.getName();
- }
-
- @Override
- public long getTimeToCountBackpressureMillis() {
- return connTimeToBackpressure;
- }
-
- // TODO - populate the other prediction fields
- @Override
- public long getTimeToBytesBackpressureMillis() {
- return 0;
- }
-
- @Override
- public long getNextIntervalBytes() {
- return 0;
- }
-
- @Override
- public int getNextIntervalCount() {
- return 0;
- }
-
- @Override
- public String getId() {
- return conn.getIdentifier();
- }
-
- @Override
- public String getGroupId() {
- return conn.getProcessGroup().getIdentifier();
- }
-
- @Override
- public String getDestinationName() {
- return conn.getDestination().getName();
- }
-
- @Override
- public String getDestinationId() {
- return conn.getDestination().getIdentifier();
- }
- };
- }
}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/SimpleRegressionBSAM.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/SimpleRegressionBSAM.java
new file mode 100644
index 0000000..8aa4a45
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/SimpleRegressionBSAM.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.status.analytics;
+
+
+import java.util.stream.Stream;
+
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.commons.math3.stat.regression.SimpleRegression;
+
+public class SimpleRegressionBSAM extends BivariateStatusAnalyticsModel {
+
+ private SimpleRegression regression;
+
+ public SimpleRegressionBSAM() {
+ this.regression = new SimpleRegression();
+ }
+
+ @Override
+ public void learn(Stream<Double> features, Stream<Double> labels) {
+ double[] labelArray = ArrayUtils.toPrimitive(labels.toArray(Double[]::new));
+ double[][] featuresMatrix = features.map(feature -> new double[]{feature}).toArray(double[][]::new);
+ regression.clear();
+ regression.addObservations(featuresMatrix, labelArray);
+ }
+
+ @Override
+ public Double predict(Double feature) {
+ return predictY(feature);
+ }
+
+ @Override
+ public Double predictX(Double y) {
+ return (y - regression.getIntercept()) / regression.getSlope();
+ }
+
+ @Override
+ public Double predictY(Double x) {
+ return regression.getSlope() * x + regression.getIntercept();
+ }
+}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/reporting/StandardEventAccess.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/reporting/StandardEventAccess.java
index 33e650b..1e36975 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/reporting/StandardEventAccess.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/reporting/StandardEventAccess.java
@@ -20,6 +20,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
@@ -343,12 +344,16 @@ public class StandardEventAccess implements UserAwareEventAccess {
}
if (statusAnalyticsEngine != null) {
- StatusAnalytics statusAnalytics = statusAnalyticsEngine.getConnectionStatusAnalytics(conn.getIdentifier());
+ StatusAnalytics statusAnalytics = statusAnalyticsEngine.getStatusAnalytics(conn.getIdentifier());
if (statusAnalytics != null) {
- connStatus.setPredictedTimeToBytesBackpressureMillis(statusAnalytics.getTimeToBytesBackpressureMillis());
- connStatus.setPredictedTimeToCountBackpressureMillis(statusAnalytics.getTimeToCountBackpressureMillis());
- connStatus.setNextPredictedQueuedBytes(statusAnalytics.getNextIntervalBytes());
- connStatus.setNextPredictedQueuedCount(statusAnalytics.getNextIntervalCount());
+ Map<String,Long> predictions = statusAnalytics.getPredictions();
+ connStatus.setPredictedTimeToBytesBackpressureMillis(predictions.get("timeToBytesBackpressureMillis"));
+ connStatus.setPredictedTimeToCountBackpressureMillis(predictions.get("timeToCountBackpressureMillis"));
+ connStatus.setNextPredictedQueuedBytes(predictions.get("nextIntervalBytes"));
+ connStatus.setNextPredictedQueuedCount(predictions.get("nextIntervalCount").intValue());
+ connStatus.setPredictedPercentCount(predictions.get("nextIntervalPercentageUseCount").intValue());
+ connStatus.setPredictedPercentBytes(predictions.get("nextIntervalPercentageUseBytes").intValue());
+ connStatus.setPredictionIntervalMillis(predictions.get("intervalTimeMillis"));
}
}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
index 35f27ec..5e53fda 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
@@ -16,7 +16,35 @@
*/
package org.apache.nifi.web;
-import com.google.common.collect.Sets;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+import java.util.function.Predicate;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.Response;
+
import org.apache.commons.collections4.CollectionUtils;
import org.apache.nifi.action.Action;
import org.apache.nifi.action.Component;
@@ -310,33 +338,7 @@ import org.apache.nifi.web.util.SnippetUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.ws.rs.WebApplicationException;
-import javax.ws.rs.core.Response;
-import java.io.IOException;
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.ListIterator;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Optional;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-import java.util.function.Function;
-import java.util.function.Predicate;
-import java.util.function.Supplier;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
+import com.google.common.collect.Sets;
/**
* Implementation of NiFiServiceFacade that performs revision checking.
@@ -3198,7 +3200,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
public ConnectionStatisticsEntity getConnectionStatistics(final String connectionId) {
final Connection connection = connectionDAO.getConnection(connectionId);
final PermissionsDTO permissions = dtoFactory.createPermissionsDto(connection);
- final ConnectionStatisticsDTO dto = dtoFactory.createConnectionStatisticsDto(controllerFacade.getConnectionStatistics(connectionId));
+ final ConnectionStatisticsDTO dto = dtoFactory.createConnectionStatisticsDto(connection, controllerFacade.getConnectionStatusAnalytics(connectionId));
return entityFactory.createConnectionStatisticsEntity(dto, permissions);
}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
index 18781d9..34c49c0 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
@@ -16,6 +16,34 @@
*/
package org.apache.nifi.web.api.dto;
+import java.text.Collator;
+import java.text.NumberFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.TimeZone;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+import javax.ws.rs.WebApplicationException;
+
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.lang3.ClassUtils;
import org.apache.commons.lang3.StringUtils;
@@ -106,7 +134,7 @@ import org.apache.nifi.controller.status.PortStatus;
import org.apache.nifi.controller.status.ProcessGroupStatus;
import org.apache.nifi.controller.status.ProcessorStatus;
import org.apache.nifi.controller.status.RemoteProcessGroupStatus;
-import org.apache.nifi.controller.status.analytics.ConnectionStatusAnalytics;
+import org.apache.nifi.controller.status.analytics.StatusAnalytics;
import org.apache.nifi.controller.status.history.GarbageCollectionHistory;
import org.apache.nifi.controller.status.history.GarbageCollectionStatus;
import org.apache.nifi.diagnostics.GarbageCollection;
@@ -236,33 +264,6 @@ import org.apache.nifi.web.api.entity.VariableEntity;
import org.apache.nifi.web.controller.ControllerFacade;
import org.apache.nifi.web.revision.RevisionManager;
-import javax.ws.rs.WebApplicationException;
-import java.text.Collator;
-import java.text.NumberFormat;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Locale;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.TimeZone;
-import java.util.TreeMap;
-import java.util.TreeSet;
-import java.util.concurrent.TimeUnit;
-import java.util.function.Function;
-import java.util.function.Supplier;
-import java.util.stream.Collectors;
-
public final class DtoFactory {
@SuppressWarnings("rawtypes")
@@ -1199,30 +1200,35 @@ public final class DtoFactory {
return connectionStatusDto;
}
- public ConnectionStatisticsDTO createConnectionStatisticsDto(final ConnectionStatusAnalytics connectionStatistics) {
+ public ConnectionStatisticsDTO createConnectionStatisticsDto(final Connection connection, final StatusAnalytics statusAnalytics) {
final ConnectionStatisticsDTO connectionStatisticsDTO = new ConnectionStatisticsDTO();
- connectionStatisticsDTO.setGroupId(connectionStatistics.getGroupId());
- connectionStatisticsDTO.setId(connectionStatistics.getId());
- connectionStatisticsDTO.setName(connectionStatistics.getName());
- connectionStatisticsDTO.setSourceId(connectionStatistics.getSourceId());
- connectionStatisticsDTO.setSourceName(connectionStatistics.getSourceName());
- connectionStatisticsDTO.setDestinationId(connectionStatistics.getDestinationId());
- connectionStatisticsDTO.setDestinationName(connectionStatistics.getDestinationName());
+
+ connectionStatisticsDTO.setGroupId(connection.getProcessGroup().getIdentifier());
+ connectionStatisticsDTO.setId(connection.getIdentifier());
+ connectionStatisticsDTO.setName(connection.getName());
+ connectionStatisticsDTO.setSourceId(connection.getSource().getIdentifier());
+ connectionStatisticsDTO.setSourceName(connection.getSource().getName());
+ connectionStatisticsDTO.setDestinationId(connection.getDestination().getIdentifier());
+ connectionStatisticsDTO.setDestinationName(connection.getDestination().getName());
connectionStatisticsDTO.setStatsLastRefreshed(new Date());
final ConnectionStatisticsSnapshotDTO snapshot = new ConnectionStatisticsSnapshotDTO();
connectionStatisticsDTO.setAggregateSnapshot(snapshot);
- snapshot.setId(connectionStatistics.getId());
- snapshot.setGroupId(connectionStatistics.getGroupId());
- snapshot.setName(connectionStatistics.getName());
- snapshot.setSourceName(connectionStatistics.getSourceName());
- snapshot.setDestinationName(connectionStatistics.getDestinationName());
-
- snapshot.setPredictedMillisUntilBytesBackpressure(connectionStatistics.getTimeToBytesBackpressureMillis());
- snapshot.setPredictedMillisUntilCountBackpressure(connectionStatistics.getTimeToCountBackpressureMillis());
- snapshot.setPredictedBytesAtNextInterval(connectionStatistics.getNextIntervalBytes());
- snapshot.setPredictedCountAtNextInterval(connectionStatistics.getNextIntervalCount());
+ snapshot.setId(connection.getIdentifier());
+ snapshot.setGroupId(connection.getProcessGroup().getIdentifier());
+ snapshot.setName(connection.getName());
+ snapshot.setSourceName(connection.getSource().getName());
+ snapshot.setDestinationName(connection.getDestination().getName());
+
+ Map<String,Long> predictions = statusAnalytics.getPredictions();
+ snapshot.setPredictedMillisUntilBytesBackpressure(predictions.get("timeToBytesBackpressureMillis"));
+ snapshot.setPredictedMillisUntilCountBackpressure(predictions.get("timeToCountBackpressureMillis"));
+ snapshot.setPredictedBytesAtNextInterval(predictions.get("nextIntervalBytes"));
+ snapshot.setPredictedCountAtNextInterval(predictions.get("nextIntervalCount").intValue());
+ snapshot.setPredictedPercentBytes(predictions.get("nextIntervalPercentageUseBytes").intValue());
+ snapshot.setPredictedPercentCount(predictions.get("nextIntervalPercentageUseCount").intValue());
+ snapshot.setPredictionIntervalMillis(predictions.get("intervalTimeMillis"));
return connectionStatisticsDTO;
}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java
index 95f0713..d7714da 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java
@@ -16,6 +16,30 @@
*/
package org.apache.nifi.web.controller;
+import java.io.IOException;
+import java.io.InputStream;
+import java.text.Collator;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TimeZone;
+import java.util.TreeSet;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import javax.ws.rs.WebApplicationException;
+
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.ClassUtils;
import org.apache.commons.lang3.StringUtils;
@@ -56,7 +80,7 @@ import org.apache.nifi.controller.status.PortStatus;
import org.apache.nifi.controller.status.ProcessGroupStatus;
import org.apache.nifi.controller.status.ProcessorStatus;
import org.apache.nifi.controller.status.RemoteProcessGroupStatus;
-import org.apache.nifi.controller.status.analytics.ConnectionStatusAnalytics;
+import org.apache.nifi.controller.status.analytics.StatusAnalytics;
import org.apache.nifi.controller.status.analytics.StatusAnalyticsEngine;
import org.apache.nifi.controller.status.history.ComponentStatusRepository;
import org.apache.nifi.diagnostics.SystemDiagnostics;
@@ -112,29 +136,6 @@ import org.apache.nifi.web.api.entity.ControllerServiceEntity;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.ws.rs.WebApplicationException;
-import java.io.IOException;
-import java.io.InputStream;
-import java.text.Collator;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Comparator;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Locale;
-import java.util.Map;
-import java.util.Set;
-import java.util.SortedSet;
-import java.util.TimeZone;
-import java.util.TreeSet;
-import java.util.concurrent.TimeUnit;
-import java.util.function.Consumer;
-import java.util.function.Function;
-import java.util.stream.Collectors;
-
public class ControllerFacade implements Authorizable {
private static final Logger logger = LoggerFactory.getLogger(ControllerFacade.class);
@@ -687,12 +688,12 @@ public class ControllerFacade implements Authorizable {
}
/**
- * Gets analytical statistics for the specified connection.
+ * Gets status analytics for the specified connection.
*
* @param connectionId connection id
* @return the statistics for the specified connection
*/
- public ConnectionStatusAnalytics getConnectionStatistics(final String connectionId) {
+ public StatusAnalytics getConnectionStatusAnalytics(final String connectionId) {
final ProcessGroup root = getRootGroup();
final Connection connection = root.findConnection(connectionId);
@@ -709,12 +710,12 @@ public class ControllerFacade implements Authorizable {
}
// get from flow controller
- final StatusAnalyticsEngine status = flowController.getStatusAnalyticsEngine();
- if (status == null) {
+ final StatusAnalyticsEngine statusAnalyticsEngine = flowController.getStatusAnalyticsEngine();
+ if (statusAnalyticsEngine == null) {
throw new ResourceNotFoundException(String.format("Unable to locate connection with id '%s'.", connectionId));
}
- return status.getConnectionStatusAnalytics(connectionId);
+ return statusAnalyticsEngine.getStatusAnalytics(connectionId);
}
/**