You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ai...@apache.org on 2019/08/19 13:53:40 UTC
[nifi] 04/21: NIFI-6510 Connect the dots for StatusAnalytics -> API
This is an automated email from the ASF dual-hosted git repository.
aichrist pushed a commit to branch analytics-framework
in repository https://gitbox.apache.org/repos/asf/nifi.git
commit be0993cb6f759784cb51ce67b32c534d8cd2cdf1
Author: Andrew I. Christianson <an...@andyic.org>
AuthorDate: Tue Jul 16 15:57:14 2019 -0400
NIFI-6510 Connect the dots for StatusAnalytics -> API
---
...alytics.java => ConnectionStatusAnalytics.java} | 9 +-
.../status/analytics/StatusAnalytics.java | 8 +-
.../org/apache/nifi/controller/FlowController.java | 10 +-
.../status/analytics/StatusAnalyticEngine.java | 106 ++++++++++++++++-----
.../org/apache/nifi/web/api/dto/DtoFactory.java | 4 +-
.../nifi/web/controller/ControllerFacade.java | 9 +-
6 files changed, 114 insertions(+), 32 deletions(-)
diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalytics.java b/nifi-api/src/main/java/org/apache/nifi/controller/status/analytics/ConnectionStatusAnalytics.java
similarity index 79%
copy from nifi-api/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalytics.java
copy to nifi-api/src/main/java/org/apache/nifi/controller/status/analytics/ConnectionStatusAnalytics.java
index d6ad3bc..12c8a15 100644
--- a/nifi-api/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalytics.java
+++ b/nifi-api/src/main/java/org/apache/nifi/controller/status/analytics/ConnectionStatusAnalytics.java
@@ -16,6 +16,13 @@
*/
package org.apache.nifi.controller.status.analytics;
-public interface StatusAnalytics {
+public interface ConnectionStatusAnalytics {
long getMinTimeToBackpressureMillis();
+ String getGroupId();
+ String getId();
+ String getName();
+ String getSourceId();
+ String getSourceName();
+ String getDestinationId();
+ String getDestinationName();
}
diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalytics.java b/nifi-api/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalytics.java
index d6ad3bc..42c2abd 100644
--- a/nifi-api/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalytics.java
+++ b/nifi-api/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalytics.java
@@ -16,6 +16,12 @@
*/
package org.apache.nifi.controller.status.analytics;
+/**
+ * StatusAnalytics
+ */
public interface StatusAnalytics {
- long getMinTimeToBackpressureMillis();
+
+ ConnectionStatusAnalytics getConnectionStatusAnalytics(String connectionId);
+
+ public long getMinTimeToBackpressureMillis();
}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index 0c422b4..56272ff 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -350,6 +350,8 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node
// guarded by rwLock
private NodeConnectionStatus connectionStatus;
+ private StatusAnalyticEngine analyticsEngine;
+
// guarded by rwLock
private String instanceId;
@@ -603,7 +605,7 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node
}
}, snapshotMillis, snapshotMillis, TimeUnit.MILLISECONDS);
- StatusAnalytics analyticsEngine = new StatusAnalyticEngine(this, componentStatusRepository);
+ analyticsEngine = new StatusAnalyticEngine(this, componentStatusRepository);
timerDrivenEngineRef.get().scheduleWithFixedDelay(new Runnable() {
@Override
@@ -614,7 +616,7 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node
LOG.error("Failed to capture component stats for Stats History", e);
}
}
- }, 1000, 1000, TimeUnit.MILLISECONDS); //FIXME use a real/configured interval
+ }, 1000, 1000, TimeUnit.MILLISECONDS); //FIXME use a real/configured interval (or maybe just compute on the fly when requested)
this.connectionStatus = new NodeConnectionStatus(nodeId, DisconnectionCode.NOT_YET_CONNECTED);
heartbeatBeanRef.set(new HeartbeatBean(rootGroup, false));
@@ -1387,6 +1389,10 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node
return eventAccess;
}
+ public StatusAnalytics getStatusAnalytics() {
+ return analyticsEngine;
+ }
+
/**
* Sets the root group to the given group
*
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalyticEngine.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalyticEngine.java
index 9231707..64c2065 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalyticEngine.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalyticEngine.java
@@ -43,22 +43,28 @@ public class StatusAnalyticEngine implements StatusAnalytics {
}
@Override
- public long getMinTimeToBackpressureMillis() {
+ public ConnectionStatusAnalytics getConnectionStatusAnalytics(String connectionId) {
ProcessGroup rootGroup = controller.getFlowManager().getRootGroup();
- List<Connection> allConnections = rootGroup.findAllConnections();
- long minTimeToBackpressure = Long.MAX_VALUE;
+ return getConnectionStatusAnalytics(rootGroup.findConnection(connectionId));
+ }
- for (Connection conn : allConnections) {
- LOG.info("Getting connection history for: " + conn.getIdentifier());
- Date minDate = new Date(System.currentTimeMillis() - (5 * 60 * 1000));
- StatusHistoryDTO connHistory = StatusHistoryUtil.createStatusHistoryDTO(statusRepository
- .getConnectionStatusHistory(conn.getIdentifier(), minDate, null, Integer.MAX_VALUE));
- List<StatusSnapshotDTO> aggregateSnapshots = connHistory.getAggregateSnapshots();
-
- if (aggregateSnapshots.size() < 2) {
- LOG.info("Not enough data to model time to backpressure.");
- continue;
- }
+ /**
+ * Finds the number of millis until the given connection will experience backpressure.
+ * @param conn the connection to run the analytic on
+ * @return
+ */
+ public ConnectionStatusAnalytics getConnectionStatusAnalytics(Connection conn) {
+ LOG.info("Getting connection history for: " + conn.getIdentifier());
+ long connTimeToBackpressure;
+ Date minDate = new Date(System.currentTimeMillis() - (5 * 60 * 1000));
+ StatusHistoryDTO connHistory = StatusHistoryUtil.createStatusHistoryDTO(
+ statusRepository.getConnectionStatusHistory(conn.getIdentifier(), minDate, null, Integer.MAX_VALUE));
+ List<StatusSnapshotDTO> aggregateSnapshots = connHistory.getAggregateSnapshots();
+
+ if (aggregateSnapshots.size() < 2) {
+ LOG.info("Not enough data to model time to backpressure.");
+ connTimeToBackpressure = Long.MAX_VALUE;
+ } else {
long backPressureObjectThreshold = conn.getFlowFileQueue().getBackPressureObjectThreshold();
LOG.info("Connection " + conn.getIdentifier() + " backpressure object threshold is "
@@ -77,16 +83,72 @@ public class StatusAnalyticEngine implements StatusAnalytics {
// Skip this connection if its queue is declining.
if (regression.getSlope() <= 0) {
LOG.info("Connection " + conn.getIdentifier() + " is not experiencing backpressure.");
- continue;
+ connTimeToBackpressure = Long.MAX_VALUE;
+ } else {
+
+ // Compute time-to backpressure for this connection; Reduce total result iff
+ // this connection is lower.
+ connTimeToBackpressure = Math
+ .round((backPressureObjectThreshold - regression.getIntercept()) / regression.getSlope())
+ - System.currentTimeMillis();
+ LOG.info("Connection " + conn.getIdentifier() + " time to backpressure is " + connTimeToBackpressure);
+ }
+ }
+
+ return new ConnectionStatusAnalytics() {
+
+ @Override
+ public String getSourceName() {
+ return conn.getSource().getName();
+ }
+
+ @Override
+ public String getSourceId() {
+ return conn.getSource().getIdentifier();
+ }
+
+ @Override
+ public String getName() {
+ return conn.getName();
+ }
+
+ @Override
+ public long getMinTimeToBackpressureMillis() {
+ return connTimeToBackpressure;
+ }
+
+ @Override
+ public String getId() {
+ return conn.getIdentifier();
+ }
+
+ @Override
+ public String getGroupId() {
+ return conn.getProcessGroup().getIdentifier();
}
- // Compute time-to backpressure for this connection; Reduce total result iff
- // this connection is lower.
- long connTimeToBackpressure = Math
- .round((backPressureObjectThreshold - regression.getIntercept()) / regression.getSlope())
- - System.currentTimeMillis();
- LOG.info("Connection " + conn.getIdentifier() + " time to backpressure is " + connTimeToBackpressure);
- minTimeToBackpressure = Math.min(minTimeToBackpressure, connTimeToBackpressure);
+ @Override
+ public String getDestinationName() {
+ return conn.getDestination().getName();
+ }
+
+ @Override
+ public String getDestinationId() {
+ return conn.getDestination().getIdentifier();
+ }
+ };
+ }
+
+ @Override
+ public long getMinTimeToBackpressureMillis() {
+ ProcessGroup rootGroup = controller.getFlowManager().getRootGroup();
+ List<Connection> allConnections = rootGroup.findAllConnections();
+ rootGroup.findConnection("asdf");
+ long minTimeToBackpressure = Long.MAX_VALUE;
+
+ for (Connection conn : allConnections) {
+ ConnectionStatusAnalytics connAnalytics = getConnectionStatusAnalytics(conn);
+ minTimeToBackpressure = Math.min(minTimeToBackpressure, connAnalytics.getMinTimeToBackpressureMillis());
}
LOG.info("Min time to backpressure is: " + Long.toString(minTimeToBackpressure));
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
index 6903e44..b8fed37 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
@@ -105,7 +105,7 @@ import org.apache.nifi.controller.status.PortStatus;
import org.apache.nifi.controller.status.ProcessGroupStatus;
import org.apache.nifi.controller.status.ProcessorStatus;
import org.apache.nifi.controller.status.RemoteProcessGroupStatus;
-import org.apache.nifi.controller.status.analytics.StatusAnalytics;
+import org.apache.nifi.controller.status.analytics.ConnectionStatusAnalytics;
import org.apache.nifi.controller.status.history.GarbageCollectionHistory;
import org.apache.nifi.controller.status.history.GarbageCollectionStatus;
import org.apache.nifi.diagnostics.GarbageCollection;
@@ -1189,7 +1189,7 @@ public final class DtoFactory {
return connectionStatusDto;
}
- public ConnectionStatisticsDTO createConnectionStatisticsDto(final StatusAnalytics connectionStatistics) {
+ public ConnectionStatisticsDTO createConnectionStatisticsDto(final ConnectionStatusAnalytics connectionStatistics) {
final ConnectionStatisticsDTO connectionStatisticsDTO = new ConnectionStatisticsDTO();
connectionStatisticsDTO.setGroupId(connectionStatistics.getGroupId());
connectionStatisticsDTO.setId(connectionStatistics.getId());
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java
index c1b6754..367ea51 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java
@@ -56,6 +56,7 @@ import org.apache.nifi.controller.status.PortStatus;
import org.apache.nifi.controller.status.ProcessGroupStatus;
import org.apache.nifi.controller.status.ProcessorStatus;
import org.apache.nifi.controller.status.RemoteProcessGroupStatus;
+import org.apache.nifi.controller.status.analytics.ConnectionStatusAnalytics;
import org.apache.nifi.controller.status.analytics.StatusAnalytics;
import org.apache.nifi.controller.status.history.ComponentStatusRepository;
import org.apache.nifi.diagnostics.SystemDiagnostics;
@@ -687,7 +688,7 @@ public class ControllerFacade implements Authorizable {
* @param connectionId connection id
* @return the statistics for the specified connection
*/
- public StatusAnalytics getConnectionStatistics(final String connectionId) {
+ public ConnectionStatusAnalytics getConnectionStatistics(final String connectionId) {
final ProcessGroup root = getRootGroup();
final Connection connection = root.findConnection(connectionId);
@@ -703,13 +704,13 @@ public class ControllerFacade implements Authorizable {
throw new ResourceNotFoundException(String.format("Unable to locate group with id '%s'.", groupId));
}
- // TODO get from flow controller
- final StatusAnalytics status;
+ // get from flow controller
+ final StatusAnalytics status = flowController.getStatusAnalytics();
if (status == null) {
throw new ResourceNotFoundException(String.format("Unable to locate connection with id '%s'.", connectionId));
}
- return status;
+ return status.getConnectionStatusAnalytics(connectionId);
}
/**