You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ai...@apache.org on 2019/08/19 13:53:40 UTC

[nifi] 04/21: NIFI-6510 Connect the dots for StatusAnalytics -> API

This is an automated email from the ASF dual-hosted git repository.

aichrist pushed a commit to branch analytics-framework
in repository https://gitbox.apache.org/repos/asf/nifi.git

commit be0993cb6f759784cb51ce67b32c534d8cd2cdf1
Author: Andrew I. Christianson <an...@andyic.org>
AuthorDate: Tue Jul 16 15:57:14 2019 -0400

    NIFI-6510 Connect the dots for StatusAnalytics -> API
---
 ...alytics.java => ConnectionStatusAnalytics.java} |   9 +-
 .../status/analytics/StatusAnalytics.java          |   8 +-
 .../org/apache/nifi/controller/FlowController.java |  10 +-
 .../status/analytics/StatusAnalyticEngine.java     | 106 ++++++++++++++++-----
 .../org/apache/nifi/web/api/dto/DtoFactory.java    |   4 +-
 .../nifi/web/controller/ControllerFacade.java      |   9 +-
 6 files changed, 114 insertions(+), 32 deletions(-)

diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalytics.java b/nifi-api/src/main/java/org/apache/nifi/controller/status/analytics/ConnectionStatusAnalytics.java
similarity index 79%
copy from nifi-api/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalytics.java
copy to nifi-api/src/main/java/org/apache/nifi/controller/status/analytics/ConnectionStatusAnalytics.java
index d6ad3bc..12c8a15 100644
--- a/nifi-api/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalytics.java
+++ b/nifi-api/src/main/java/org/apache/nifi/controller/status/analytics/ConnectionStatusAnalytics.java
@@ -16,6 +16,13 @@
  */
 package org.apache.nifi.controller.status.analytics;
 
-public interface StatusAnalytics {
+public interface ConnectionStatusAnalytics {
     long getMinTimeToBackpressureMillis();
+    String getGroupId();
+    String getId();
+    String getName();
+    String getSourceId();
+    String getSourceName();
+    String getDestinationId();
+    String getDestinationName();
 }
diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalytics.java b/nifi-api/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalytics.java
index d6ad3bc..42c2abd 100644
--- a/nifi-api/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalytics.java
+++ b/nifi-api/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalytics.java
@@ -16,6 +16,12 @@
  */
 package org.apache.nifi.controller.status.analytics;
 
+/**
+ * StatusAnalytics
+ */
 public interface StatusAnalytics {
-    long getMinTimeToBackpressureMillis();
+
+    ConnectionStatusAnalytics getConnectionStatusAnalytics(String connectionId);
+
+    public long getMinTimeToBackpressureMillis();
 }
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index 0c422b4..56272ff 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -350,6 +350,8 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node
     // guarded by rwLock
     private NodeConnectionStatus connectionStatus;
 
+    private StatusAnalyticEngine analyticsEngine;
+
     // guarded by rwLock
     private String instanceId;
 
@@ -603,7 +605,7 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node
             }
         }, snapshotMillis, snapshotMillis, TimeUnit.MILLISECONDS);
 
-        StatusAnalytics analyticsEngine = new StatusAnalyticEngine(this, componentStatusRepository);
+        analyticsEngine = new StatusAnalyticEngine(this, componentStatusRepository);
 
         timerDrivenEngineRef.get().scheduleWithFixedDelay(new Runnable() {
             @Override
@@ -614,7 +616,7 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node
                     LOG.error("Failed to capture component stats for Stats History", e);
                 }
             }
-        }, 1000, 1000, TimeUnit.MILLISECONDS); //FIXME use a real/configured interval
+        }, 1000, 1000, TimeUnit.MILLISECONDS); //FIXME use a real/configured interval (or maybe just compute on the fly when requested)
 
         this.connectionStatus = new NodeConnectionStatus(nodeId, DisconnectionCode.NOT_YET_CONNECTED);
         heartbeatBeanRef.set(new HeartbeatBean(rootGroup, false));
@@ -1387,6 +1389,10 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node
         return eventAccess;
     }
 
+    public StatusAnalytics getStatusAnalytics() {
+        return analyticsEngine;
+    }
+
     /**
      * Sets the root group to the given group
      *
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalyticEngine.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalyticEngine.java
index 9231707..64c2065 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalyticEngine.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalyticEngine.java
@@ -43,22 +43,28 @@ public class StatusAnalyticEngine implements StatusAnalytics {
     }
 
     @Override
-    public long getMinTimeToBackpressureMillis() {
+    public ConnectionStatusAnalytics getConnectionStatusAnalytics(String connectionId) {
         ProcessGroup rootGroup = controller.getFlowManager().getRootGroup();
-        List<Connection> allConnections = rootGroup.findAllConnections();
-        long minTimeToBackpressure = Long.MAX_VALUE;
+        return getConnectionStatusAnalytics(rootGroup.findConnection(connectionId));
+    }
 
-        for (Connection conn : allConnections) {
-            LOG.info("Getting connection history for: " + conn.getIdentifier());
-            Date minDate = new Date(System.currentTimeMillis() - (5 * 60 * 1000));
-            StatusHistoryDTO connHistory = StatusHistoryUtil.createStatusHistoryDTO(statusRepository
-                    .getConnectionStatusHistory(conn.getIdentifier(), minDate, null, Integer.MAX_VALUE));
-            List<StatusSnapshotDTO> aggregateSnapshots = connHistory.getAggregateSnapshots();
-
-            if (aggregateSnapshots.size() < 2) {
-                LOG.info("Not enough data to model time to backpressure.");
-                continue;
-            }
+    /**
+     * Finds the number of millis until the given connection will experience backpressure.
+     * @param conn the connection to run the analytic on
+     * @return
+     */
+    public ConnectionStatusAnalytics getConnectionStatusAnalytics(Connection conn) {
+        LOG.info("Getting connection history for: " + conn.getIdentifier());
+        long connTimeToBackpressure;
+        Date minDate = new Date(System.currentTimeMillis() - (5 * 60 * 1000));
+        StatusHistoryDTO connHistory = StatusHistoryUtil.createStatusHistoryDTO(
+                statusRepository.getConnectionStatusHistory(conn.getIdentifier(), minDate, null, Integer.MAX_VALUE));
+        List<StatusSnapshotDTO> aggregateSnapshots = connHistory.getAggregateSnapshots();
+
+        if (aggregateSnapshots.size() < 2) {
+            LOG.info("Not enough data to model time to backpressure.");
+            connTimeToBackpressure = Long.MAX_VALUE;
+        } else {
 
             long backPressureObjectThreshold = conn.getFlowFileQueue().getBackPressureObjectThreshold();
             LOG.info("Connection " + conn.getIdentifier() + " backpressure object threshold is "
@@ -77,16 +83,72 @@ public class StatusAnalyticEngine implements StatusAnalytics {
             // Skip this connection if its queue is declining.
             if (regression.getSlope() <= 0) {
                 LOG.info("Connection " + conn.getIdentifier() + " is not experiencing backpressure.");
-                continue;
+                connTimeToBackpressure = Long.MAX_VALUE;
+            } else {
+
+                // Compute time-to backpressure for this connection; Reduce total result iff
+                // this connection is lower.
+                connTimeToBackpressure = Math
+                        .round((backPressureObjectThreshold - regression.getIntercept()) / regression.getSlope())
+                        - System.currentTimeMillis();
+                LOG.info("Connection " + conn.getIdentifier() + " time to backpressure is " + connTimeToBackpressure);
+            }
+        }
+
+        return new ConnectionStatusAnalytics() {
+
+            @Override
+            public String getSourceName() {
+                return conn.getSource().getName();
+            }
+
+            @Override
+            public String getSourceId() {
+                return conn.getSource().getIdentifier();
+            }
+
+            @Override
+            public String getName() {
+                return conn.getName();
+            }
+
+            @Override
+            public long getMinTimeToBackpressureMillis() {
+                return connTimeToBackpressure;
+            }
+
+            @Override
+            public String getId() {
+                return conn.getIdentifier();
+            }
+
+            @Override
+            public String getGroupId() {
+                return conn.getProcessGroup().getIdentifier();
             }
 
-            // Compute time-to backpressure for this connection; Reduce total result iff
-            // this connection is lower.
-            long connTimeToBackpressure = Math
-                    .round((backPressureObjectThreshold - regression.getIntercept()) / regression.getSlope())
-                    - System.currentTimeMillis();
-            LOG.info("Connection " + conn.getIdentifier() + " time to backpressure is " + connTimeToBackpressure);
-            minTimeToBackpressure = Math.min(minTimeToBackpressure, connTimeToBackpressure);
+            @Override
+            public String getDestinationName() {
+                return conn.getDestination().getName();
+            }
+
+            @Override
+            public String getDestinationId() {
+                return conn.getDestination().getIdentifier();
+            }
+        };
+    }
+
+    @Override
+    public long getMinTimeToBackpressureMillis() {
+        ProcessGroup rootGroup = controller.getFlowManager().getRootGroup();
+        List<Connection> allConnections = rootGroup.findAllConnections();
+        rootGroup.findConnection("asdf");
+        long minTimeToBackpressure = Long.MAX_VALUE;
+
+        for (Connection conn : allConnections) {
+            ConnectionStatusAnalytics connAnalytics = getConnectionStatusAnalytics(conn);
+            minTimeToBackpressure = Math.min(minTimeToBackpressure, connAnalytics.getMinTimeToBackpressureMillis());
         }
 
         LOG.info("Min time to backpressure is: " + Long.toString(minTimeToBackpressure));
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
index 6903e44..b8fed37 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
@@ -105,7 +105,7 @@ import org.apache.nifi.controller.status.PortStatus;
 import org.apache.nifi.controller.status.ProcessGroupStatus;
 import org.apache.nifi.controller.status.ProcessorStatus;
 import org.apache.nifi.controller.status.RemoteProcessGroupStatus;
-import org.apache.nifi.controller.status.analytics.StatusAnalytics;
+import org.apache.nifi.controller.status.analytics.ConnectionStatusAnalytics;
 import org.apache.nifi.controller.status.history.GarbageCollectionHistory;
 import org.apache.nifi.controller.status.history.GarbageCollectionStatus;
 import org.apache.nifi.diagnostics.GarbageCollection;
@@ -1189,7 +1189,7 @@ public final class DtoFactory {
         return connectionStatusDto;
     }
 
-    public ConnectionStatisticsDTO createConnectionStatisticsDto(final StatusAnalytics connectionStatistics) {
+    public ConnectionStatisticsDTO createConnectionStatisticsDto(final ConnectionStatusAnalytics connectionStatistics) {
         final ConnectionStatisticsDTO connectionStatisticsDTO = new ConnectionStatisticsDTO();
         connectionStatisticsDTO.setGroupId(connectionStatistics.getGroupId());
         connectionStatisticsDTO.setId(connectionStatistics.getId());
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java
index c1b6754..367ea51 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java
@@ -56,6 +56,7 @@ import org.apache.nifi.controller.status.PortStatus;
 import org.apache.nifi.controller.status.ProcessGroupStatus;
 import org.apache.nifi.controller.status.ProcessorStatus;
 import org.apache.nifi.controller.status.RemoteProcessGroupStatus;
+import org.apache.nifi.controller.status.analytics.ConnectionStatusAnalytics;
 import org.apache.nifi.controller.status.analytics.StatusAnalytics;
 import org.apache.nifi.controller.status.history.ComponentStatusRepository;
 import org.apache.nifi.diagnostics.SystemDiagnostics;
@@ -687,7 +688,7 @@ public class ControllerFacade implements Authorizable {
      * @param connectionId connection id
      * @return the statistics for the specified connection
      */
-    public StatusAnalytics getConnectionStatistics(final String connectionId) {
+    public ConnectionStatusAnalytics getConnectionStatistics(final String connectionId) {
         final ProcessGroup root = getRootGroup();
         final Connection connection = root.findConnection(connectionId);
 
@@ -703,13 +704,13 @@ public class ControllerFacade implements Authorizable {
             throw new ResourceNotFoundException(String.format("Unable to locate group with id '%s'.", groupId));
         }
 
-        // TODO get from flow controller
-        final StatusAnalytics status;
+        // get from flow controller
+        final StatusAnalytics status = flowController.getStatusAnalytics();
         if (status == null) {
             throw new ResourceNotFoundException(String.format("Unable to locate connection with id '%s'.", connectionId));
         }
 
-        return status;
+        return status.getConnectionStatusAnalytics(connectionId);
     }
 
     /**