You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ai...@apache.org on 2019/08/28 20:20:59 UTC
[nifi] 06/24: NIFI-6510 Updated objects and interfaces to reflect 4
prediction metrics
This is an automated email from the ASF dual-hosted git repository.
aichrist pushed a commit to branch analytics-framework
in repository https://gitbox.apache.org/repos/asf/nifi.git
commit cb5825022dfddb268a78efc35431f73c6afd3cab
Author: Matthew Burgess <ma...@apache.org>
AuthorDate: Mon Jul 22 18:11:58 2019 -0400
NIFI-6510 Updated objects and interfaces to reflect 4 prediction metrics
(cherry picked from commit 050e0fc)
(cherry picked from commit 9fd365f)
---
.../nifi/controller/status/ConnectionStatus.java | 48 ++++++++++++++
.../analytics/ConnectionStatusAnalytics.java | 26 +++++++-
.../status/analytics/StatusAnalytics.java | 30 ++++++++-
.../status/ConnectionStatisticsSnapshotDTO.java | 47 ++++++++++++--
.../dto/status/ConnectionStatusSnapshotDTO.java | 44 +++++++++++++
.../entity/ConnectionStatisticsSnapshotEntity.java | 1 -
.../org/apache/nifi/controller/FlowController.java | 75 +++++++++-------------
.../status/analytics/StatusAnalyticEngine.java | 42 +++++++++++-
.../apache/nifi/reporting/StandardEventAccess.java | 9 +++
.../org/apache/nifi/web/api/dto/DtoFactory.java | 9 ++-
10 files changed, 274 insertions(+), 57 deletions(-)
diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/status/ConnectionStatus.java b/nifi-api/src/main/java/org/apache/nifi/controller/status/ConnectionStatus.java
index 341fda2..0e5d306 100644
--- a/nifi-api/src/main/java/org/apache/nifi/controller/status/ConnectionStatus.java
+++ b/nifi-api/src/main/java/org/apache/nifi/controller/status/ConnectionStatus.java
@@ -40,6 +40,10 @@ public class ConnectionStatus implements Cloneable {
private long outputBytes;
private int maxQueuedCount;
private long maxQueuedBytes;
+ private int nextPredictedQueuedCount;
+ private long nextPredictedQueuedBytes;
+ private long predictedTimeToCountBackpressureMillis;
+ private long predictedTimeToBytesBackpressureMillis;
public String getId() {
return id;
@@ -186,6 +190,38 @@ public class ConnectionStatus implements Cloneable {
this.backPressureBytesThreshold = backPressureBytesThreshold;
}
+ public int getNextPredictedQueuedCount() {
+ return nextPredictedQueuedCount;
+ }
+
+ public void setNextPredictedQueuedCount(int nextPredictedQueuedCount) {
+ this.nextPredictedQueuedCount = nextPredictedQueuedCount;
+ }
+
+ public long getNextPredictedQueuedBytes() {
+ return nextPredictedQueuedBytes;
+ }
+
+ public void setNextPredictedQueuedBytes(long nextPredictedQueuedBytes) {
+ this.nextPredictedQueuedBytes = nextPredictedQueuedBytes;
+ }
+
+ public long getPredictedTimeToCountBackpressureMillis() {
+ return predictedTimeToCountBackpressureMillis;
+ }
+
+ public void setPredictedTimeToCountBackpressureMillis(long predictedTimeToCountBackpressureMillis) {
+ this.predictedTimeToCountBackpressureMillis = predictedTimeToCountBackpressureMillis;
+ }
+
+ public long getPredictedTimeToBytesBackpressureMillis() {
+ return predictedTimeToBytesBackpressureMillis;
+ }
+
+ public void setPredictedTimeToBytesBackpressureMillis(long predictedTimeToBytesBackpressureMillis) {
+ this.predictedTimeToBytesBackpressureMillis = predictedTimeToBytesBackpressureMillis;
+ }
+
@Override
public ConnectionStatus clone() {
final ConnectionStatus clonedObj = new ConnectionStatus();
@@ -206,6 +242,10 @@ public class ConnectionStatus implements Cloneable {
clonedObj.backPressureObjectThreshold = backPressureObjectThreshold;
clonedObj.maxQueuedBytes = maxQueuedBytes;
clonedObj.maxQueuedCount = maxQueuedCount;
+ clonedObj.nextPredictedQueuedBytes = nextPredictedQueuedBytes;
+ clonedObj.nextPredictedQueuedCount = nextPredictedQueuedCount;
+ clonedObj.predictedTimeToBytesBackpressureMillis = predictedTimeToBytesBackpressureMillis;
+ clonedObj.predictedTimeToCountBackpressureMillis = predictedTimeToCountBackpressureMillis;
return clonedObj;
}
@@ -246,6 +286,14 @@ public class ConnectionStatus implements Cloneable {
builder.append(maxQueuedCount);
builder.append(", maxQueueBytes=");
builder.append(maxQueuedBytes);
+ builder.append(", nextPredictedQueuedBytes=");
+ builder.append(nextPredictedQueuedBytes);
+ builder.append(", nextPredictedQueuedCount=");
+ builder.append(nextPredictedQueuedCount);
+ builder.append(", predictedTimeToBytesBackpressureMillis=");
+ builder.append(predictedTimeToBytesBackpressureMillis);
+ builder.append(", predictedTimeToCountBackpressureMillis=");
+ builder.append(predictedTimeToCountBackpressureMillis);
builder.append("]");
return builder.toString();
}
diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/status/analytics/ConnectionStatusAnalytics.java b/nifi-api/src/main/java/org/apache/nifi/controller/status/analytics/ConnectionStatusAnalytics.java
index 12c8a15..2380d55 100644
--- a/nifi-api/src/main/java/org/apache/nifi/controller/status/analytics/ConnectionStatusAnalytics.java
+++ b/nifi-api/src/main/java/org/apache/nifi/controller/status/analytics/ConnectionStatusAnalytics.java
@@ -17,7 +17,31 @@
package org.apache.nifi.controller.status.analytics;
public interface ConnectionStatusAnalytics {
- long getMinTimeToBackpressureMillis();
+
+ /**
+ * Returns the predicted time (in milliseconds) when backpressure is expected to be applied to this connection, based on the total number of bytes in the queue.
+ * @return milliseconds until backpressure is predicted to occur, based on the total number of bytes in the queue.
+ */
+ long getTimeToBytesBackpressureMillis();
+
+ /**
+ * Returns the predicted time (in milliseconds) when backpressure is expected to be applied to this connection, based on the number of objects in the queue.
+ * @return milliseconds until backpressure is predicted to occur, based on the number of objects in the queue.
+ */
+ long getTimeToCountBackpressureMillis();
+
+ /**
+ * Returns the predicted total number of bytes in the queue to occur at the next configured interval (5 mins in the future, e.g.).
+ * @return milliseconds until backpressure is predicted to occur, based on the total number of bytes in the queue.
+ */
+ long getNextIntervalBytes();
+
+ /**
+ * Returns the predicted number of objects in the queue to occur at the next configured interval (5 mins in the future, e.g.).
+ * @return milliseconds until backpressure is predicted to occur, based on the number of bytes in the queue.
+ */
+ int getNextIntervalCount();
+
String getGroupId();
String getId();
String getName();
diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalytics.java b/nifi-api/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalytics.java
index 42c2abd..7d29314 100644
--- a/nifi-api/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalytics.java
+++ b/nifi-api/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalytics.java
@@ -21,7 +21,35 @@ package org.apache.nifi.controller.status.analytics;
*/
public interface StatusAnalytics {
+ /**
+ * Returns a ConnectionStatusAnalytics object containing all relevant metrics and analytical & statistical objects, as well as identity information for the connection.
+ *
+ * @param connectionId The unique ID of the connection
+ * @return A ConnectionStatusAnalytics object
+ */
ConnectionStatusAnalytics getConnectionStatusAnalytics(String connectionId);
- public long getMinTimeToBackpressureMillis();
+ /**
+ * Returns the predicted time (in milliseconds) when backpressure is expected to be applied to this connection, based on the total number of bytes in the queue.
+ * @return milliseconds until backpressure is predicted to occur, based on the total number of bytes in the queue.
+ */
+ long getTimeToBytesBackpressureMillis();
+
+ /**
+ * Returns the predicted time (in milliseconds) when backpressure is expected to be applied to this connection, based on the number of objects in the queue.
+ * @return milliseconds until backpressure is predicted to occur, based on the number of objects in the queue.
+ */
+ long getTimeToCountBackpressureMillis();
+
+ /**
+ * Returns the predicted total number of bytes in the queue to occur at the next configured interval (5 mins in the future, e.g.).
+ * @return milliseconds until backpressure is predicted to occur, based on the total number of bytes in the queue.
+ */
+ long getNextIntervalBytes();
+
+ /**
+ * Returns the predicted number of objects in the queue to occur at the next configured interval (5 mins in the future, e.g.).
+ * @return milliseconds until backpressure is predicted to occur, based on the number of bytes in the queue.
+ */
+ int getNextIntervalCount();
}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/ConnectionStatisticsSnapshotDTO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/ConnectionStatisticsSnapshotDTO.java
index e914f74..526bdcf 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/ConnectionStatisticsSnapshotDTO.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/ConnectionStatisticsSnapshotDTO.java
@@ -35,7 +35,10 @@ public class ConnectionStatisticsSnapshotDTO implements Cloneable {
private String destinationId;
private String destinationName;
- private Long predictedMillisUntilBackpressure = 0L;
+ private Long predictedMillisUntilCountBackpressure = 0L;
+ private Long predictedMillisUntilBytesBackpressure = 0L;
+ private Integer predictedCountAtNextInterval = 0;
+ private Long predictedBytesAtNextInterval = 0L;
/* getters / setters */
/**
@@ -122,13 +125,40 @@ public class ConnectionStatisticsSnapshotDTO implements Cloneable {
this.destinationName = destinationName;
}
- @ApiModelProperty("The predicted number of milliseconds before the connection will have backpressure applied.")
- public Long getPredictedMillisUntilBackpressure() {
- return predictedMillisUntilBackpressure;
+ @ApiModelProperty("The predicted number of milliseconds before the connection will have backpressure applied, based on the queued count.")
+ public Long getPredictedMillisUntilCountBackpressure() {
+ return predictedMillisUntilCountBackpressure;
}
- public void setPredictedMillisUntilBackpressure(Long predictedMillisUntilBackpressure) {
- this.predictedMillisUntilBackpressure = predictedMillisUntilBackpressure;
+ public void setPredictedMillisUntilCountBackpressure(Long predictedMillisUntilCountBackpressure) {
+ this.predictedMillisUntilCountBackpressure = predictedMillisUntilCountBackpressure;
+ }
+
+ @ApiModelProperty("The predicted number of milliseconds before the connection will have backpressure applied, based on the total number of bytes in the queue.")
+ public Long getPredictedMillisUntilBytesBackpressure() {
+ return predictedMillisUntilBytesBackpressure;
+ }
+
+ public void setPredictedMillisUntilBytesBackpressure(Long predictedMillisUntilBytesBackpressure) {
+ this.predictedMillisUntilBytesBackpressure = predictedMillisUntilBytesBackpressure;
+ }
+
+ @ApiModelProperty("The predicted number of queued objects at the next configured interval.")
+ public Integer getPredictedCountAtNextInterval() {
+ return predictedCountAtNextInterval;
+ }
+
+ public void setPredictedCountAtNextInterval(Integer predictedCountAtNextInterval) {
+ this.predictedCountAtNextInterval = predictedCountAtNextInterval;
+ }
+
+ @ApiModelProperty("The predicted total number of bytes in the queue at the next configured interval.")
+ public Long getPredictedBytesAtNextInterval() {
+ return predictedBytesAtNextInterval;
+ }
+
+ public void setPredictedBytesAtNextInterval(Long predictedBytesAtNextInterval) {
+ this.predictedBytesAtNextInterval = predictedBytesAtNextInterval;
}
@Override
@@ -142,7 +172,10 @@ public class ConnectionStatisticsSnapshotDTO implements Cloneable {
other.setSourceId(getSourceId());
other.setSourceName(getSourceName());
- other.setPredictedMillisUntilBackpressure(getPredictedMillisUntilBackpressure());
+ other.setPredictedMillisUntilCountBackpressure(getPredictedMillisUntilCountBackpressure());
+ other.setPredictedMillisUntilBytesBackpressure(getPredictedMillisUntilBytesBackpressure());
+ other.setPredictedCountAtNextInterval(getPredictedCountAtNextInterval());
+ other.setPredictedBytesAtNextInterval(getPredictedBytesAtNextInterval());
return other;
}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/ConnectionStatusSnapshotDTO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/ConnectionStatusSnapshotDTO.java
index 7ba93cc..3237385 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/ConnectionStatusSnapshotDTO.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/ConnectionStatusSnapshotDTO.java
@@ -48,6 +48,10 @@ public class ConnectionStatusSnapshotDTO implements Cloneable {
private String queuedCount;
private Integer percentUseCount;
private Integer percentUseBytes;
+ private Long predictedMillisUntilCountBackpressure = 0L;
+ private Long predictedMillisUntilBytesBackpressure = 0L;
+ private Integer predictedCountAtNextInterval = 0;
+ private Long predictedBytesAtNextInterval = 0L;
/* getters / setters */
/**
@@ -271,6 +275,42 @@ public class ConnectionStatusSnapshotDTO implements Cloneable {
this.percentUseBytes = percentUseBytes;
}
+ @ApiModelProperty("The predicted number of milliseconds before the connection will have backpressure applied, based on the queued count.")
+ public Long getPredictedMillisUntilCountBackpressure() {
+ return predictedMillisUntilCountBackpressure;
+ }
+
+ public void setPredictedMillisUntilCountBackpressure(Long predictedMillisUntilCountBackpressure) {
+ this.predictedMillisUntilCountBackpressure = predictedMillisUntilCountBackpressure;
+ }
+
+ @ApiModelProperty("The predicted number of milliseconds before the connection will have backpressure applied, based on the total number of bytes in the queue.")
+ public Long getPredictedMillisUntilBytesBackpressure() {
+ return predictedMillisUntilBytesBackpressure;
+ }
+
+ public void setPredictedMillisUntilBytesBackpressure(Long predictedMillisUntilBytesBackpressure) {
+ this.predictedMillisUntilBytesBackpressure = predictedMillisUntilBytesBackpressure;
+ }
+
+ @ApiModelProperty("The predicted number of queued objects at the next configured interval.")
+ public Integer getPredictedCountAtNextInterval() {
+ return predictedCountAtNextInterval;
+ }
+
+ public void setPredictedCountAtNextInterval(Integer predictedCountAtNextInterval) {
+ this.predictedCountAtNextInterval = predictedCountAtNextInterval;
+ }
+
+ @ApiModelProperty("The predicted total number of bytes in the queue at the next configured interval.")
+ public Long getPredictedBytesAtNextInterval() {
+ return predictedBytesAtNextInterval;
+ }
+
+ public void setPredictedBytesAtNextInterval(Long predictedBytesAtNextInterval) {
+ this.predictedBytesAtNextInterval = predictedBytesAtNextInterval;
+ }
+
@Override
public ConnectionStatusSnapshotDTO clone() {
final ConnectionStatusSnapshotDTO other = new ConnectionStatusSnapshotDTO();
@@ -295,6 +335,10 @@ public class ConnectionStatusSnapshotDTO implements Cloneable {
other.setQueuedSize(getQueuedSize());
other.setPercentUseBytes(getPercentUseBytes());
other.setPercentUseCount(getPercentUseCount());
+ other.setPredictedMillisUntilCountBackpressure(getPredictedMillisUntilCountBackpressure());
+ other.setPredictedMillisUntilBytesBackpressure(getPredictedMillisUntilBytesBackpressure());
+ other.setPredictedCountAtNextInterval(getPredictedCountAtNextInterval());
+ other.setPredictedBytesAtNextInterval(getPredictedBytesAtNextInterval());
return other;
}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ConnectionStatisticsSnapshotEntity.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ConnectionStatisticsSnapshotEntity.java
index da7e5ca..6f4eee5 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ConnectionStatisticsSnapshotEntity.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ConnectionStatisticsSnapshotEntity.java
@@ -19,7 +19,6 @@ package org.apache.nifi.web.api.entity;
import io.swagger.annotations.ApiModelProperty;
import org.apache.nifi.web.api.dto.ReadablePermission;
import org.apache.nifi.web.api.dto.status.ConnectionStatisticsSnapshotDTO;
-import org.apache.nifi.web.api.dto.status.ConnectionStatusSnapshotDTO;
/**
* A serialized representation of this class can be placed in the entity body of a request or response to or from the API.
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index 462b113..5f67b49 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -16,39 +16,6 @@
*/
package org.apache.nifi.controller;
-import static java.util.Objects.requireNonNull;
-
-import java.io.ByteArrayInputStream;
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.lang.management.GarbageCollectorMXBean;
-import java.lang.management.ManagementFactory;
-import java.net.InetSocketAddress;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.stream.Collectors;
-
-import javax.net.ssl.SSLContext;
-
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.admin.service.AuditService;
import org.apache.nifi.annotation.lifecycle.OnConfigurationRestored;
@@ -217,6 +184,38 @@ import org.apache.zookeeper.server.quorum.QuorumPeerConfig.ConfigException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.net.ssl.SSLContext;
+import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.lang.management.GarbageCollectorMXBean;
+import java.lang.management.ManagementFactory;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
+
+import static java.util.Objects.requireNonNull;
+
public class FlowController implements ReportingTaskProvider, Authorizable, NodeTypeProvider {
// default repository implementations
@@ -608,16 +607,6 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node
analyticsEngine = new CachingStatusAnalyticEngine(this, componentStatusRepository);
- timerDrivenEngineRef.get().scheduleWithFixedDelay(new Runnable() {
- @Override
- public void run() {
- try {
- analyticsEngine.refreshModel();
- } catch (final Exception e) {
- LOG.error("Failed to refresh model:", e);
- }
- }
- }, 1, 1, TimeUnit.MINUTES); //FIXME use a real/configured interval (or maybe just compute on the fly when requested)
this.connectionStatus = new NodeConnectionStatus(nodeId, DisconnectionCode.NOT_YET_CONNECTED);
heartbeatBeanRef.set(new HeartbeatBean(rootGroup, false));
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalyticEngine.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalyticEngine.java
index 64c2065..024c138 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalyticEngine.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalyticEngine.java
@@ -113,10 +113,26 @@ public class StatusAnalyticEngine implements StatusAnalytics {
}
@Override
- public long getMinTimeToBackpressureMillis() {
+ public long getTimeToCountBackpressureMillis() {
return connTimeToBackpressure;
}
+ // TODO - populate the other prediction fields
+ @Override
+ public long getTimeToBytesBackpressureMillis() {
+ return 0;
+ }
+
+ @Override
+ public long getNextIntervalBytes() {
+ return 0;
+ }
+
+ @Override
+ public int getNextIntervalCount() {
+ return 0;
+ }
+
@Override
public String getId() {
return conn.getIdentifier();
@@ -139,7 +155,6 @@ public class StatusAnalyticEngine implements StatusAnalytics {
};
}
- @Override
public long getMinTimeToBackpressureMillis() {
ProcessGroup rootGroup = controller.getFlowManager().getRootGroup();
List<Connection> allConnections = rootGroup.findAllConnections();
@@ -148,10 +163,31 @@ public class StatusAnalyticEngine implements StatusAnalytics {
for (Connection conn : allConnections) {
ConnectionStatusAnalytics connAnalytics = getConnectionStatusAnalytics(conn);
- minTimeToBackpressure = Math.min(minTimeToBackpressure, connAnalytics.getMinTimeToBackpressureMillis());
+ minTimeToBackpressure = Math.min(minTimeToBackpressure, connAnalytics.getTimeToCountBackpressureMillis());
}
LOG.info("Min time to backpressure is: " + Long.toString(minTimeToBackpressure));
return minTimeToBackpressure;
}
+
+ // TODO - populate the prediction fields. Do we need to pass in connection ID?
+ @Override
+ public long getTimeToCountBackpressureMillis() {
+ return 0;
+ }
+
+ @Override
+ public long getTimeToBytesBackpressureMillis() {
+ return 0;
+ }
+
+ @Override
+ public long getNextIntervalBytes() {
+ return 0;
+ }
+
+ @Override
+ public int getNextIntervalCount() {
+ return 0;
+ }
}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/reporting/StandardEventAccess.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/reporting/StandardEventAccess.java
index 0b4b73c..f943856 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/reporting/StandardEventAccess.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/reporting/StandardEventAccess.java
@@ -51,6 +51,7 @@ import org.apache.nifi.controller.status.ProcessorStatus;
import org.apache.nifi.controller.status.RemoteProcessGroupStatus;
import org.apache.nifi.controller.status.RunStatus;
import org.apache.nifi.controller.status.TransmissionStatus;
+import org.apache.nifi.controller.status.analytics.StatusAnalytics;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.groups.RemoteProcessGroup;
import org.apache.nifi.history.History;
@@ -338,6 +339,14 @@ public class StandardEventAccess implements UserAwareEventAccess {
bytesTransferred += connectionStatusReport.getContentSizeIn() + connectionStatusReport.getContentSizeOut();
}
+ final StatusAnalytics statusAnalytics = flowController.getStatusAnalytics();
+ if (statusAnalytics != null) {
+ connStatus.setPredictedTimeToBytesBackpressureMillis(statusAnalytics.getTimeToBytesBackpressureMillis());
+ connStatus.setPredictedTimeToCountBackpressureMillis(statusAnalytics.getTimeToCountBackpressureMillis());
+ connStatus.setNextPredictedQueuedBytes(statusAnalytics.getNextIntervalBytes());
+ connStatus.setNextPredictedQueuedCount(statusAnalytics.getNextIntervalCount());
+ }
+
if (isConnectionAuthorized) {
if (StringUtils.isNotBlank(conn.getName())) {
connStatus.setName(conn.getName());
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
index 5afdd5a..2059c45 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
@@ -1181,9 +1181,13 @@ public final class DtoFactory {
if (connectionStatus.getBackPressureObjectThreshold() > 0) {
snapshot.setPercentUseCount(Math.min(100, StatusMerger.getUtilization(connectionStatus.getQueuedCount(), connectionStatus.getBackPressureObjectThreshold())));
+ snapshot.setPredictedMillisUntilCountBackpressure(connectionStatus.getPredictedTimeToCountBackpressureMillis());
+ snapshot.setPredictedCountAtNextInterval(connectionStatus.getNextPredictedQueuedCount());
}
if (connectionStatus.getBackPressureBytesThreshold() > 0) {
snapshot.setPercentUseBytes(Math.min(100, StatusMerger.getUtilization(connectionStatus.getQueuedBytes(), connectionStatus.getBackPressureBytesThreshold())));
+ snapshot.setPredictedMillisUntilBytesBackpressure(connectionStatus.getPredictedTimeToBytesBackpressureMillis());
+ snapshot.setPredictedBytesAtNextInterval(connectionStatus.getNextPredictedQueuedBytes());
}
StatusMerger.updatePrettyPrintedFields(snapshot);
@@ -1211,7 +1215,10 @@ public final class DtoFactory {
snapshot.setSourceName(connectionStatistics.getSourceName());
snapshot.setDestinationName(connectionStatistics.getDestinationName());
- snapshot.setPredictedMillisUntilBackpressure(connectionStatistics.getMinTimeToBackpressureMillis());
+ snapshot.setPredictedMillisUntilBytesBackpressure(connectionStatistics.getTimeToBytesBackpressureMillis());
+ snapshot.setPredictedMillisUntilCountBackpressure(connectionStatistics.getTimeToCountBackpressureMillis());
+ snapshot.setPredictedBytesAtNextInterval(connectionStatistics.getNextIntervalBytes());
+ snapshot.setPredictedCountAtNextInterval(connectionStatistics.getNextIntervalCount());
return connectionStatisticsDTO;
}