You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ai...@apache.org on 2019/08/28 20:21:00 UTC

[nifi] 07/24: NIFI-6510 adjustments for interface updates, added call to StandardEventAccess, updated interface to use connection id

This is an automated email from the ASF dual-hosted git repository.

aichrist pushed a commit to branch analytics-framework
in repository https://gitbox.apache.org/repos/asf/nifi.git

commit 60f5a76229983dfa6374cb0ba181a2e6087d355f
Author: Yolanda Davis <yo...@gmail.com>
AuthorDate: Tue Jul 23 01:38:06 2019 -0400

    NIFI-6510 adjustments for interface updates, added call to StandardEventAccess, updated interface to use connection id
    
    (cherry picked from commit 14854ff)
    
    DFA-9 - reduced snapshot interval to 1 minute
    
    (cherry picked from commit 36abb0a)
---
 .../status/analytics/StatusAnalytics.java          | 12 ++--
 .../org/apache/nifi/controller/FlowController.java | 74 +++++++++++-----------
 .../analytics/CachingStatusAnalyticEngine.java     | 70 +++++++++++++-------
 .../status/analytics/StatusAnalyticEngine.java     |  8 +--
 .../apache/nifi/reporting/StandardEventAccess.java | 11 ++--
 5 files changed, 102 insertions(+), 73 deletions(-)

diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalytics.java b/nifi-api/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalytics.java
index 7d29314..45e1c12 100644
--- a/nifi-api/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalytics.java
+++ b/nifi-api/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalytics.java
@@ -32,24 +32,28 @@ public interface StatusAnalytics {
     /**
      * Returns the predicted time (in milliseconds) when backpressure is expected to be applied to this connection, based on the total number of bytes in the queue.
      * @return milliseconds until backpressure is predicted to occur, based on the total number of bytes in the queue.
+     * @param connectionId
      */
-    long getTimeToBytesBackpressureMillis();
+    long getTimeToBytesBackpressureMillis(String connectionId);
 
     /**
      * Returns the predicted time (in milliseconds) when backpressure is expected to be applied to this connection, based on the number of objects in the queue.
      * @return milliseconds until backpressure is predicted to occur, based on the number of objects in the queue.
+     * @param connectionId
      */
-    long getTimeToCountBackpressureMillis();
+    long getTimeToCountBackpressureMillis(String connectionId);
 
     /**
      * Returns the predicted total number of bytes in the queue to occur at the next configured interval (5 mins in the future, e.g.).
      * @return milliseconds until backpressure is predicted to occur, based on the total number of bytes in the queue.
+     * @param connectionId
      */
-    long getNextIntervalBytes();
+    long getNextIntervalBytes(String connectionId);
 
     /**
      * Returns the predicted number of objects in the queue to occur at the next configured interval (5 mins in the future, e.g.).
      * @return milliseconds until backpressure is predicted to occur, based on the number of bytes in the queue.
+     * @param connectionId
      */
-    int getNextIntervalCount();
+    int getNextIntervalCount(String connectionId);
 }
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index 5f67b49..450d944 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -16,6 +16,39 @@
  */
 package org.apache.nifi.controller;
 
+import static java.util.Objects.requireNonNull;
+
+import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.lang.management.GarbageCollectorMXBean;
+import java.lang.management.ManagementFactory;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
+
+import javax.net.ssl.SSLContext;
+
 import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.admin.service.AuditService;
 import org.apache.nifi.annotation.lifecycle.OnConfigurationRestored;
@@ -184,38 +217,6 @@ import org.apache.zookeeper.server.quorum.QuorumPeerConfig.ConfigException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.net.ssl.SSLContext;
-import java.io.ByteArrayInputStream;
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.lang.management.GarbageCollectorMXBean;
-import java.lang.management.ManagementFactory;
-import java.net.InetSocketAddress;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.stream.Collectors;
-
-import static java.util.Objects.requireNonNull;
-
 public class FlowController implements ReportingTaskProvider, Authorizable, NodeTypeProvider {
 
     // default repository implementations
@@ -350,7 +351,7 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node
     // guarded by rwLock
     private NodeConnectionStatus connectionStatus;
 
-    private CachingStatusAnalyticEngine analyticsEngine;
+    private StatusAnalytics analyticsEngine;
 
     // guarded by rwLock
     private String instanceId;
@@ -592,8 +593,10 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node
             zooKeeperStateServer = null;
         }
 
-        eventAccess = new StandardEventAccess(this, flowFileEventRepository);
         componentStatusRepository = createComponentStatusRepository();
+        analyticsEngine = new CachingStatusAnalyticEngine(this, componentStatusRepository);
+        eventAccess = new StandardEventAccess(this, flowFileEventRepository);
+
         timerDrivenEngineRef.get().scheduleWithFixedDelay(new Runnable() {
             @Override
             public void run() {
@@ -605,9 +608,6 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node
             }
         }, snapshotMillis, snapshotMillis, TimeUnit.MILLISECONDS);
 
-        analyticsEngine = new CachingStatusAnalyticEngine(this, componentStatusRepository);
-
-
         this.connectionStatus = new NodeConnectionStatus(nodeId, DisconnectionCode.NOT_YET_CONNECTED);
         heartbeatBeanRef.set(new HeartbeatBean(rootGroup, false));
 
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/CachingStatusAnalyticEngine.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/CachingStatusAnalyticEngine.java
index 5241c4a..864a5d4 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/CachingStatusAnalyticEngine.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/CachingStatusAnalyticEngine.java
@@ -2,6 +2,7 @@ package org.apache.nifi.controller.status.analytics;
 
 import java.util.Date;
 import java.util.List;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.math3.stat.regression.SimpleRegression;
 import org.apache.nifi.connectable.Connection;
@@ -28,6 +29,7 @@ public class CachingStatusAnalyticEngine implements StatusAnalytics {
         this.controller = controller;
         this.statusRepository = statusRepository;
         this.cache = Caffeine.newBuilder()
+                .expireAfterWrite(1,TimeUnit.MINUTES)
                 .build();
     }
 
@@ -38,15 +40,37 @@ public class CachingStatusAnalyticEngine implements StatusAnalytics {
         Connection connection = rootGroup.findConnection(connectionId);
         SimpleRegression cachedRegression = cache.getIfPresent(connection.getIdentifier());
 
-        if(cachedRegression != null) {
-            cache.put(connection.getIdentifier(), cachedRegression);
+        if(cachedRegression == null) {
+            cachedRegression = getBackPressureRegressionModel(connection);
+            if(cachedRegression != null)
+                cache.put(connection.getIdentifier(), cachedRegression);
         }
 
         ConnectionStatusAnalytics cachedResult = calculate(cachedRegression,connection);
-        LOG.info("Connection: " + connectionId + " Cached backpressure Time: " + cachedResult.getMinTimeToBackpressureMillis() );
+        LOG.info("Connection: " + connectionId + " Cached backpressure Time: " + cachedResult.getTimeToCountBackpressureMillis());
         return cachedResult;
     }
 
+    @Override
+    public long getTimeToBytesBackpressureMillis(String connectionId) {
+        return 0;
+    }
+
+    @Override
+    public long getTimeToCountBackpressureMillis(String connectionId) {
+        return getConnectionStatusAnalytics(connectionId).getTimeToCountBackpressureMillis();
+    }
+
+    @Override
+    public long getNextIntervalBytes(String connectionId) {
+        return 0;
+    }
+
+    @Override
+    public int getNextIntervalCount(String connectionId) {
+        return 0;
+    }
+
     protected ConnectionStatusAnalytics calculate(SimpleRegression regression, Connection conn){
         long backPressureObjectThreshold = conn.getFlowFileQueue().getBackPressureObjectThreshold();
 
@@ -77,14 +101,30 @@ public class CachingStatusAnalyticEngine implements StatusAnalytics {
                 return conn.getName();
             }
 
+
+            @Override
+            public String getId() {
+                return conn.getIdentifier();
+            }
+
             @Override
-            public long getMinTimeToBackpressureMillis() {
+            public long getTimeToBytesBackpressureMillis() {
+                return 0;
+            }
+
+            @Override
+            public long getTimeToCountBackpressureMillis() {
                 return connTimeToBackpressure;
             }
 
             @Override
-            public String getId() {
-                return conn.getIdentifier();
+            public long getNextIntervalBytes() {
+                return 0;
+            }
+
+            @Override
+            public int getNextIntervalCount() {
+                return 0;
             }
 
             @Override
@@ -111,7 +151,7 @@ public class CachingStatusAnalyticEngine implements StatusAnalytics {
      * @return
      */
     protected SimpleRegression getBackPressureRegressionModel(Connection conn) {
-        Date minDate = new Date(System.currentTimeMillis() - (5 * 60 * 1000));
+        Date minDate = new Date(System.currentTimeMillis() - (60 * 1000));
         StatusHistoryDTO connHistory = StatusHistoryUtil.createStatusHistoryDTO(
                 statusRepository.getConnectionStatusHistory(conn.getIdentifier(), minDate, null, Integer.MAX_VALUE));
         List<StatusSnapshotDTO> aggregateSnapshots = connHistory.getAggregateSnapshots();
@@ -128,7 +168,6 @@ public class CachingStatusAnalyticEngine implements StatusAnalytics {
                 Long snapQueuedCount = snap.getStatusMetrics().get(ConnectionStatusDescriptor.QUEUED_COUNT.getField());
                 long snapTime = snap.getTimestamp().getTime();
                 regression.addData(snapTime, snapQueuedCount);
-                LOG.info("Connection " + conn.getIdentifier() + " statistics: ("+snapTime+","+snapQueuedCount+")");
             }
 
             if (regression.getSlope() <= 0 && !conn.getFlowFileQueue().isFull()) {
@@ -141,20 +180,5 @@ public class CachingStatusAnalyticEngine implements StatusAnalytics {
 
     }
 
-    public void refreshModel() {
-        ProcessGroup rootGroup = controller.getFlowManager().getRootGroup();
-        List<Connection> allConnections = rootGroup.findAllConnections();
-        cache.invalidateAll();
-        for (Connection conn : allConnections) {
-            SimpleRegression regression = getBackPressureRegressionModel(conn);
-            if(regression != null) {
-                cache.put(conn.getIdentifier(), regression);
-            }
-        }
-    }
 
-    @Override
-    public long getMinTimeToBackpressureMillis() {
-        return 0;
-    }
 }
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalyticEngine.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalyticEngine.java
index 024c138..5a873d5 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalyticEngine.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalyticEngine.java
@@ -172,22 +172,22 @@ public class StatusAnalyticEngine implements StatusAnalytics {
 
     // TODO - populate the prediction fields. Do we need to pass in connection ID?
     @Override
-    public long getTimeToCountBackpressureMillis() {
+    public long getTimeToCountBackpressureMillis(String connectionId) {
         return 0;
     }
 
     @Override
-    public long getTimeToBytesBackpressureMillis() {
+    public long getTimeToBytesBackpressureMillis(String connectionId) {
         return 0;
     }
 
     @Override
-    public long getNextIntervalBytes() {
+    public long getNextIntervalBytes(String connectionId) {
         return 0;
     }
 
     @Override
-    public int getNextIntervalCount() {
+    public int getNextIntervalCount(String connectionId) {
         return 0;
     }
 }
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/reporting/StandardEventAccess.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/reporting/StandardEventAccess.java
index f943856..87fcd4d 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/reporting/StandardEventAccess.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/reporting/StandardEventAccess.java
@@ -65,10 +65,12 @@ import org.apache.nifi.remote.RemoteGroupPort;
 public class StandardEventAccess implements UserAwareEventAccess {
     private final FlowFileEventRepository flowFileEventRepository;
     private final FlowController flowController;
+    private final StatusAnalytics statusAnalytics;
 
     public StandardEventAccess(final FlowController flowController, final FlowFileEventRepository flowFileEventRepository) {
         this.flowController = flowController;
         this.flowFileEventRepository = flowFileEventRepository;
+        this.statusAnalytics = flowController.getStatusAnalytics();
     }
 
     /**
@@ -339,12 +341,11 @@ public class StandardEventAccess implements UserAwareEventAccess {
                 bytesTransferred += connectionStatusReport.getContentSizeIn() + connectionStatusReport.getContentSizeOut();
             }
 
-            final StatusAnalytics statusAnalytics = flowController.getStatusAnalytics();
             if (statusAnalytics != null) {
-                connStatus.setPredictedTimeToBytesBackpressureMillis(statusAnalytics.getTimeToBytesBackpressureMillis());
-                connStatus.setPredictedTimeToCountBackpressureMillis(statusAnalytics.getTimeToCountBackpressureMillis());
-                connStatus.setNextPredictedQueuedBytes(statusAnalytics.getNextIntervalBytes());
-                connStatus.setNextPredictedQueuedCount(statusAnalytics.getNextIntervalCount());
+                connStatus.setPredictedTimeToBytesBackpressureMillis(statusAnalytics.getTimeToBytesBackpressureMillis(conn.getIdentifier()));
+                connStatus.setPredictedTimeToCountBackpressureMillis(statusAnalytics.getTimeToCountBackpressureMillis(conn.getIdentifier()));
+                connStatus.setNextPredictedQueuedBytes(statusAnalytics.getNextIntervalBytes(conn.getIdentifier()));
+                connStatus.setNextPredictedQueuedCount(statusAnalytics.getNextIntervalCount(conn.getIdentifier()));
             }
 
             if (isConnectionAuthorized) {