You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ai...@apache.org on 2019/08/19 13:53:41 UTC

[nifi] 05/21: NIFI-6510 Added poc engine with prediction model caching

This is an automated email from the ASF dual-hosted git repository.

aichrist pushed a commit to branch analytics-framework
in repository https://gitbox.apache.org/repos/asf/nifi.git

commit bd011f3e27eef4d5fa620b8c2a5f3eccf7990492
Author: Yolanda Davis <yo...@gmail.com>
AuthorDate: Fri Jul 19 17:28:29 2019 -0400

    NIFI-6510 Added poc engine with prediction model caching
    
    (cherry picked from commit e013b91)
    
    DFA-9 - updated logging and corrected logic for checking if not in backpressure
    
    (cherry picked from commit a1f8e70)
---
 .../nifi-framework/nifi-framework-core/pom.xml     |   6 +
 .../org/apache/nifi/controller/FlowController.java |  77 +++++-----
 .../analytics/CachingStatusAnalyticEngine.java     | 160 +++++++++++++++++++++
 .../apache/nifi/reporting/StandardEventAccess.java |  14 +-
 4 files changed, 212 insertions(+), 45 deletions(-)

diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml
index 6551d54..2d17086 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml
@@ -241,6 +241,12 @@
             <version>1.10.0-SNAPSHOT</version>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>com.github.ben-manes.caffeine</groupId>
+            <artifactId>caffeine</artifactId>
+            <version>1.0.1</version>
+            <scope>compile</scope>
+        </dependency>
     </dependencies>
     <build>
         <plugins>
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index 56272ff..462b113 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -16,6 +16,39 @@
  */
 package org.apache.nifi.controller;
 
+import static java.util.Objects.requireNonNull;
+
+import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.lang.management.GarbageCollectorMXBean;
+import java.lang.management.ManagementFactory;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
+
+import javax.net.ssl.SSLContext;
+
 import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.admin.service.AuditService;
 import org.apache.nifi.annotation.lifecycle.OnConfigurationRestored;
@@ -117,7 +150,7 @@ import org.apache.nifi.controller.service.StandardConfigurationContext;
 import org.apache.nifi.controller.service.StandardControllerServiceProvider;
 import org.apache.nifi.controller.state.manager.StandardStateManagerProvider;
 import org.apache.nifi.controller.state.server.ZooKeeperStateServer;
-import org.apache.nifi.controller.status.analytics.StatusAnalyticEngine;
+import org.apache.nifi.controller.status.analytics.CachingStatusAnalyticEngine;
 import org.apache.nifi.controller.status.analytics.StatusAnalytics;
 import org.apache.nifi.controller.status.history.ComponentStatusRepository;
 import org.apache.nifi.controller.status.history.GarbageCollectionHistory;
@@ -184,38 +217,6 @@ import org.apache.zookeeper.server.quorum.QuorumPeerConfig.ConfigException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.net.ssl.SSLContext;
-import java.io.ByteArrayInputStream;
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.lang.management.GarbageCollectorMXBean;
-import java.lang.management.ManagementFactory;
-import java.net.InetSocketAddress;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.stream.Collectors;
-
-import static java.util.Objects.requireNonNull;
-
 public class FlowController implements ReportingTaskProvider, Authorizable, NodeTypeProvider {
 
     // default repository implementations
@@ -350,7 +351,7 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node
     // guarded by rwLock
     private NodeConnectionStatus connectionStatus;
 
-    private StatusAnalyticEngine analyticsEngine;
+    private CachingStatusAnalyticEngine analyticsEngine;
 
     // guarded by rwLock
     private String instanceId;
@@ -605,18 +606,18 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node
             }
         }, snapshotMillis, snapshotMillis, TimeUnit.MILLISECONDS);
 
-        analyticsEngine = new StatusAnalyticEngine(this, componentStatusRepository);
+        analyticsEngine = new CachingStatusAnalyticEngine(this, componentStatusRepository);
 
         timerDrivenEngineRef.get().scheduleWithFixedDelay(new Runnable() {
             @Override
             public void run() {
                 try {
-                    analyticsEngine.getMinTimeToBackpressureMillis();
+                    analyticsEngine.refreshModel();
                 } catch (final Exception e) {
-                    LOG.error("Failed to capture component stats for Stats History", e);
+                    LOG.error("Failed to refresh model:", e);
                 }
             }
-        }, 1000, 1000, TimeUnit.MILLISECONDS); //FIXME use a real/configured interval (or maybe just compute on the fly when requested)
+        }, 1, 1, TimeUnit.MINUTES); //FIXME use a real/configured interval (or maybe just compute on the fly when requested)
 
         this.connectionStatus = new NodeConnectionStatus(nodeId, DisconnectionCode.NOT_YET_CONNECTED);
         heartbeatBeanRef.set(new HeartbeatBean(rootGroup, false));
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/CachingStatusAnalyticEngine.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/CachingStatusAnalyticEngine.java
new file mode 100644
index 0000000..5241c4a
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/CachingStatusAnalyticEngine.java
@@ -0,0 +1,160 @@
+package org.apache.nifi.controller.status.analytics;
+
+import java.util.Date;
+import java.util.List;
+
+import org.apache.commons.math3.stat.regression.SimpleRegression;
+import org.apache.nifi.connectable.Connection;
+import org.apache.nifi.controller.FlowController;
+import org.apache.nifi.controller.status.history.ComponentStatusRepository;
+import org.apache.nifi.controller.status.history.ConnectionStatusDescriptor;
+import org.apache.nifi.controller.status.history.StatusHistoryUtil;
+import org.apache.nifi.groups.ProcessGroup;
+import org.apache.nifi.web.api.dto.status.StatusHistoryDTO;
+import org.apache.nifi.web.api.dto.status.StatusSnapshotDTO;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+
+public class CachingStatusAnalyticEngine implements StatusAnalytics {
+    private ComponentStatusRepository statusRepository;
+    private FlowController controller;
+    private volatile Cache<String, SimpleRegression> cache;
+    private static final Logger LOG = LoggerFactory.getLogger(StatusAnalyticEngine.class);
+
+    public CachingStatusAnalyticEngine(FlowController controller, ComponentStatusRepository statusRepository) {
+        this.controller = controller;
+        this.statusRepository = statusRepository;
+        this.cache = Caffeine.newBuilder()
+                .build();
+    }
+
+    @Override
+    public ConnectionStatusAnalytics getConnectionStatusAnalytics(String connectionId) {
+
+        ProcessGroup rootGroup = controller.getFlowManager().getRootGroup();
+        Connection connection = rootGroup.findConnection(connectionId);
+        SimpleRegression cachedRegression = cache.getIfPresent(connection.getIdentifier());
+
+        if(cachedRegression != null) {
+            cache.put(connection.getIdentifier(), cachedRegression);
+        }
+
+        ConnectionStatusAnalytics cachedResult = calculate(cachedRegression,connection);
+        LOG.info("Connection: " + connectionId + " Cached backpressure Time: " + cachedResult.getMinTimeToBackpressureMillis() );
+        return cachedResult;
+    }
+
+    protected ConnectionStatusAnalytics calculate(SimpleRegression regression, Connection conn){
+        long backPressureObjectThreshold = conn.getFlowFileQueue().getBackPressureObjectThreshold();
+
+        final long connTimeToBackpressure;
+
+        if(regression == null){
+            connTimeToBackpressure = Long.MAX_VALUE;
+        }else{
+            //If calculation returns as negative only 0 will return
+            connTimeToBackpressure = Math.max(0, Math.round((backPressureObjectThreshold - regression.getIntercept()) / regression.getSlope())
+                    - System.currentTimeMillis());
+        }
+
+        return new ConnectionStatusAnalytics() {
+
+            @Override
+            public String getSourceName() {
+                return conn.getSource().getName();
+            }
+
+            @Override
+            public String getSourceId() {
+                return conn.getSource().getIdentifier();
+            }
+
+            @Override
+            public String getName() {
+                return conn.getName();
+            }
+
+            @Override
+            public long getMinTimeToBackpressureMillis() {
+                return connTimeToBackpressure;
+            }
+
+            @Override
+            public String getId() {
+                return conn.getIdentifier();
+            }
+
+            @Override
+            public String getGroupId() {
+                return conn.getProcessGroup().getIdentifier();
+            }
+
+            @Override
+            public String getDestinationName() {
+                return conn.getDestination().getName();
+            }
+
+            @Override
+            public String getDestinationId() {
+                return conn.getDestination().getIdentifier();
+            }
+        };
+
+    }
+
+    /**
+     * Get backpressure model based on current data
+     * @param conn the connection to run the analytic on
+     * @return
+     */
+    protected SimpleRegression getBackPressureRegressionModel(Connection conn) {
+        Date minDate = new Date(System.currentTimeMillis() - (5 * 60 * 1000));
+        StatusHistoryDTO connHistory = StatusHistoryUtil.createStatusHistoryDTO(
+                statusRepository.getConnectionStatusHistory(conn.getIdentifier(), minDate, null, Integer.MAX_VALUE));
+        List<StatusSnapshotDTO> aggregateSnapshots = connHistory.getAggregateSnapshots();
+
+        if (aggregateSnapshots.size() < 2) {
+            LOG.info("Not enough data to model time to backpressure.");
+            return null;
+        } else {
+
+            ConnectionStatusDescriptor.QUEUED_COUNT.getField();
+            SimpleRegression regression = new SimpleRegression();
+
+            for (StatusSnapshotDTO snap : aggregateSnapshots) {
+                Long snapQueuedCount = snap.getStatusMetrics().get(ConnectionStatusDescriptor.QUEUED_COUNT.getField());
+                long snapTime = snap.getTimestamp().getTime();
+                regression.addData(snapTime, snapQueuedCount);
+                LOG.info("Connection " + conn.getIdentifier() + " statistics: ("+snapTime+","+snapQueuedCount+")");
+            }
+
+            if (regression.getSlope() <= 0 && !conn.getFlowFileQueue().isFull()) {
+                LOG.info("Connection " + conn.getIdentifier() + " is not experiencing backpressure.");
+                return null;
+            } else {
+                return regression;
+            }
+        }
+
+    }
+
+    public void refreshModel() {
+        ProcessGroup rootGroup = controller.getFlowManager().getRootGroup();
+        List<Connection> allConnections = rootGroup.findAllConnections();
+        cache.invalidateAll();
+        for (Connection conn : allConnections) {
+            SimpleRegression regression = getBackPressureRegressionModel(conn);
+            if(regression != null) {
+                cache.put(conn.getIdentifier(), regression);
+            }
+        }
+    }
+
+    @Override
+    public long getMinTimeToBackpressureMillis() {
+        return 0;
+    }
+}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/reporting/StandardEventAccess.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/reporting/StandardEventAccess.java
index 095ddf8..0b4b73c 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/reporting/StandardEventAccess.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/reporting/StandardEventAccess.java
@@ -16,6 +16,13 @@
  */
 package org.apache.nifi.reporting;
 
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
 import org.apache.commons.collections4.Predicate;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.action.Action;
@@ -54,13 +61,6 @@ import org.apache.nifi.registry.flow.VersionControlInformation;
 import org.apache.nifi.remote.PublicPort;
 import org.apache.nifi.remote.RemoteGroupPort;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-
 public class StandardEventAccess implements UserAwareEventAccess {
     private final FlowFileEventRepository flowFileEventRepository;
     private final FlowController flowController;