You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ai...@apache.org on 2019/08/28 20:20:58 UTC
[nifi] 05/24: NIFI-6510 Added poc engine with prediction model
caching
This is an automated email from the ASF dual-hosted git repository.
aichrist pushed a commit to branch analytics-framework
in repository https://gitbox.apache.org/repos/asf/nifi.git
commit 4daddb16646a8a1f57536d3b2fb74b8e9e228405
Author: Yolanda Davis <yo...@gmail.com>
AuthorDate: Fri Jul 19 17:28:29 2019 -0400
NIFI-6510 Added poc engine with prediction model caching
(cherry picked from commit e013b91)
DFA-9 - updated logging and corrected logic for checking if not in backpressure
(cherry picked from commit a1f8e70)
---
.../nifi-framework/nifi-framework-core/pom.xml | 6 +
.../org/apache/nifi/controller/FlowController.java | 77 +++++-----
.../analytics/CachingStatusAnalyticEngine.java | 160 +++++++++++++++++++++
.../apache/nifi/reporting/StandardEventAccess.java | 14 +-
4 files changed, 212 insertions(+), 45 deletions(-)
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml
index 6551d54..2d17086 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml
@@ -241,6 +241,12 @@
<version>1.10.0-SNAPSHOT</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>com.github.ben-manes.caffeine</groupId>
+ <artifactId>caffeine</artifactId>
+ <version>1.0.1</version>
+ <scope>compile</scope>
+ </dependency>
</dependencies>
<build>
<plugins>
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index 56272ff..462b113 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -16,6 +16,39 @@
*/
package org.apache.nifi.controller;
+import static java.util.Objects.requireNonNull;
+
+import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.lang.management.GarbageCollectorMXBean;
+import java.lang.management.ManagementFactory;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
+
+import javax.net.ssl.SSLContext;
+
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.admin.service.AuditService;
import org.apache.nifi.annotation.lifecycle.OnConfigurationRestored;
@@ -117,7 +150,7 @@ import org.apache.nifi.controller.service.StandardConfigurationContext;
import org.apache.nifi.controller.service.StandardControllerServiceProvider;
import org.apache.nifi.controller.state.manager.StandardStateManagerProvider;
import org.apache.nifi.controller.state.server.ZooKeeperStateServer;
-import org.apache.nifi.controller.status.analytics.StatusAnalyticEngine;
+import org.apache.nifi.controller.status.analytics.CachingStatusAnalyticEngine;
import org.apache.nifi.controller.status.analytics.StatusAnalytics;
import org.apache.nifi.controller.status.history.ComponentStatusRepository;
import org.apache.nifi.controller.status.history.GarbageCollectionHistory;
@@ -184,38 +217,6 @@ import org.apache.zookeeper.server.quorum.QuorumPeerConfig.ConfigException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.net.ssl.SSLContext;
-import java.io.ByteArrayInputStream;
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.lang.management.GarbageCollectorMXBean;
-import java.lang.management.ManagementFactory;
-import java.net.InetSocketAddress;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.stream.Collectors;
-
-import static java.util.Objects.requireNonNull;
-
public class FlowController implements ReportingTaskProvider, Authorizable, NodeTypeProvider {
// default repository implementations
@@ -350,7 +351,7 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node
// guarded by rwLock
private NodeConnectionStatus connectionStatus;
- private StatusAnalyticEngine analyticsEngine;
+ private CachingStatusAnalyticEngine analyticsEngine;
// guarded by rwLock
private String instanceId;
@@ -605,18 +606,18 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node
}
}, snapshotMillis, snapshotMillis, TimeUnit.MILLISECONDS);
- analyticsEngine = new StatusAnalyticEngine(this, componentStatusRepository);
+ analyticsEngine = new CachingStatusAnalyticEngine(this, componentStatusRepository);
timerDrivenEngineRef.get().scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
try {
- analyticsEngine.getMinTimeToBackpressureMillis();
+ analyticsEngine.refreshModel();
} catch (final Exception e) {
- LOG.error("Failed to capture component stats for Stats History", e);
+ LOG.error("Failed to refresh model:", e);
}
}
- }, 1000, 1000, TimeUnit.MILLISECONDS); //FIXME use a real/configured interval (or maybe just compute on the fly when requested)
+ }, 1, 1, TimeUnit.MINUTES); //FIXME use a real/configured interval (or maybe just compute on the fly when requested)
this.connectionStatus = new NodeConnectionStatus(nodeId, DisconnectionCode.NOT_YET_CONNECTED);
heartbeatBeanRef.set(new HeartbeatBean(rootGroup, false));
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/CachingStatusAnalyticEngine.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/CachingStatusAnalyticEngine.java
new file mode 100644
index 0000000..5241c4a
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/CachingStatusAnalyticEngine.java
@@ -0,0 +1,160 @@
+package org.apache.nifi.controller.status.analytics;
+
+import java.util.Date;
+import java.util.List;
+
+import org.apache.commons.math3.stat.regression.SimpleRegression;
+import org.apache.nifi.connectable.Connection;
+import org.apache.nifi.controller.FlowController;
+import org.apache.nifi.controller.status.history.ComponentStatusRepository;
+import org.apache.nifi.controller.status.history.ConnectionStatusDescriptor;
+import org.apache.nifi.controller.status.history.StatusHistoryUtil;
+import org.apache.nifi.groups.ProcessGroup;
+import org.apache.nifi.web.api.dto.status.StatusHistoryDTO;
+import org.apache.nifi.web.api.dto.status.StatusSnapshotDTO;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+
+public class CachingStatusAnalyticEngine implements StatusAnalytics {
+ private ComponentStatusRepository statusRepository;
+ private FlowController controller;
+ private volatile Cache<String, SimpleRegression> cache;
+ private static final Logger LOG = LoggerFactory.getLogger(StatusAnalyticEngine.class);
+
+ public CachingStatusAnalyticEngine(FlowController controller, ComponentStatusRepository statusRepository) {
+ this.controller = controller;
+ this.statusRepository = statusRepository;
+ this.cache = Caffeine.newBuilder()
+ .build();
+ }
+
+ @Override
+ public ConnectionStatusAnalytics getConnectionStatusAnalytics(String connectionId) {
+
+ ProcessGroup rootGroup = controller.getFlowManager().getRootGroup();
+ Connection connection = rootGroup.findConnection(connectionId);
+ SimpleRegression cachedRegression = cache.getIfPresent(connection.getIdentifier());
+
+ if(cachedRegression != null) {
+ cache.put(connection.getIdentifier(), cachedRegression);
+ }
+
+ ConnectionStatusAnalytics cachedResult = calculate(cachedRegression,connection);
+ LOG.info("Connection: " + connectionId + " Cached backpressure Time: " + cachedResult.getMinTimeToBackpressureMillis() );
+ return cachedResult;
+ }
+
+ protected ConnectionStatusAnalytics calculate(SimpleRegression regression, Connection conn){
+ long backPressureObjectThreshold = conn.getFlowFileQueue().getBackPressureObjectThreshold();
+
+ final long connTimeToBackpressure;
+
+ if(regression == null){
+ connTimeToBackpressure = Long.MAX_VALUE;
+ }else{
+ //If calculation returns as negative only 0 will return
+ connTimeToBackpressure = Math.max(0, Math.round((backPressureObjectThreshold - regression.getIntercept()) / regression.getSlope())
+ - System.currentTimeMillis());
+ }
+
+ return new ConnectionStatusAnalytics() {
+
+ @Override
+ public String getSourceName() {
+ return conn.getSource().getName();
+ }
+
+ @Override
+ public String getSourceId() {
+ return conn.getSource().getIdentifier();
+ }
+
+ @Override
+ public String getName() {
+ return conn.getName();
+ }
+
+ @Override
+ public long getMinTimeToBackpressureMillis() {
+ return connTimeToBackpressure;
+ }
+
+ @Override
+ public String getId() {
+ return conn.getIdentifier();
+ }
+
+ @Override
+ public String getGroupId() {
+ return conn.getProcessGroup().getIdentifier();
+ }
+
+ @Override
+ public String getDestinationName() {
+ return conn.getDestination().getName();
+ }
+
+ @Override
+ public String getDestinationId() {
+ return conn.getDestination().getIdentifier();
+ }
+ };
+
+ }
+
+ /**
+ * Get backpressure model based on current data
+ * @param conn the connection to run the analytic on
+ * @return
+ */
+ protected SimpleRegression getBackPressureRegressionModel(Connection conn) {
+ Date minDate = new Date(System.currentTimeMillis() - (5 * 60 * 1000));
+ StatusHistoryDTO connHistory = StatusHistoryUtil.createStatusHistoryDTO(
+ statusRepository.getConnectionStatusHistory(conn.getIdentifier(), minDate, null, Integer.MAX_VALUE));
+ List<StatusSnapshotDTO> aggregateSnapshots = connHistory.getAggregateSnapshots();
+
+ if (aggregateSnapshots.size() < 2) {
+ LOG.info("Not enough data to model time to backpressure.");
+ return null;
+ } else {
+
+ ConnectionStatusDescriptor.QUEUED_COUNT.getField();
+ SimpleRegression regression = new SimpleRegression();
+
+ for (StatusSnapshotDTO snap : aggregateSnapshots) {
+ Long snapQueuedCount = snap.getStatusMetrics().get(ConnectionStatusDescriptor.QUEUED_COUNT.getField());
+ long snapTime = snap.getTimestamp().getTime();
+ regression.addData(snapTime, snapQueuedCount);
+ LOG.info("Connection " + conn.getIdentifier() + " statistics: ("+snapTime+","+snapQueuedCount+")");
+ }
+
+ if (regression.getSlope() <= 0 && !conn.getFlowFileQueue().isFull()) {
+ LOG.info("Connection " + conn.getIdentifier() + " is not experiencing backpressure.");
+ return null;
+ } else {
+ return regression;
+ }
+ }
+
+ }
+
+ public void refreshModel() {
+ ProcessGroup rootGroup = controller.getFlowManager().getRootGroup();
+ List<Connection> allConnections = rootGroup.findAllConnections();
+ cache.invalidateAll();
+ for (Connection conn : allConnections) {
+ SimpleRegression regression = getBackPressureRegressionModel(conn);
+ if(regression != null) {
+ cache.put(conn.getIdentifier(), regression);
+ }
+ }
+ }
+
+ @Override
+ public long getMinTimeToBackpressureMillis() {
+ return 0;
+ }
+}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/reporting/StandardEventAccess.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/reporting/StandardEventAccess.java
index 095ddf8..0b4b73c 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/reporting/StandardEventAccess.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/reporting/StandardEventAccess.java
@@ -16,6 +16,13 @@
*/
package org.apache.nifi.reporting;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
import org.apache.commons.collections4.Predicate;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.action.Action;
@@ -54,13 +61,6 @@ import org.apache.nifi.registry.flow.VersionControlInformation;
import org.apache.nifi.remote.PublicPort;
import org.apache.nifi.remote.RemoteGroupPort;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-
public class StandardEventAccess implements UserAwareEventAccess {
private final FlowFileEventRepository flowFileEventRepository;
private final FlowController flowController;