You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ai...@apache.org on 2019/08/28 20:21:00 UTC
[nifi] 07/24: NIFI-6510 adjustments for interface updates,
added call to StandardEventAccess, updated interface to use connection id
This is an automated email from the ASF dual-hosted git repository.
aichrist pushed a commit to branch analytics-framework
in repository https://gitbox.apache.org/repos/asf/nifi.git
commit 60f5a76229983dfa6374cb0ba181a2e6087d355f
Author: Yolanda Davis <yo...@gmail.com>
AuthorDate: Tue Jul 23 01:38:06 2019 -0400
NIFI-6510 adjustments for interface updates, added call to StandardEventAccess, updated interface to use connection id
(cherry picked from commit 14854ff)
DFA-9 - reduced snapshot interval to 1 minute
(cherry picked from commit 36abb0a)
---
.../status/analytics/StatusAnalytics.java | 12 ++--
.../org/apache/nifi/controller/FlowController.java | 74 +++++++++++-----------
.../analytics/CachingStatusAnalyticEngine.java | 70 +++++++++++++-------
.../status/analytics/StatusAnalyticEngine.java | 8 +--
.../apache/nifi/reporting/StandardEventAccess.java | 11 ++--
5 files changed, 102 insertions(+), 73 deletions(-)
diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalytics.java b/nifi-api/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalytics.java
index 7d29314..45e1c12 100644
--- a/nifi-api/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalytics.java
+++ b/nifi-api/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalytics.java
@@ -32,24 +32,28 @@ public interface StatusAnalytics {
/**
* Returns the predicted time (in milliseconds) when backpressure is expected to be applied to this connection, based on the total number of bytes in the queue.
* @return milliseconds until backpressure is predicted to occur, based on the total number of bytes in the queue.
+ * @param connectionId
*/
- long getTimeToBytesBackpressureMillis();
+ long getTimeToBytesBackpressureMillis(String connectionId);
/**
* Returns the predicted time (in milliseconds) when backpressure is expected to be applied to this connection, based on the number of objects in the queue.
* @return milliseconds until backpressure is predicted to occur, based on the number of objects in the queue.
+ * @param connectionId
*/
- long getTimeToCountBackpressureMillis();
+ long getTimeToCountBackpressureMillis(String connectionId);
/**
* Returns the predicted total number of bytes in the queue to occur at the next configured interval (5 mins in the future, e.g.).
* @return milliseconds until backpressure is predicted to occur, based on the total number of bytes in the queue.
+ * @param connectionId
*/
- long getNextIntervalBytes();
+ long getNextIntervalBytes(String connectionId);
/**
* Returns the predicted number of objects in the queue to occur at the next configured interval (5 mins in the future, e.g.).
* @return milliseconds until backpressure is predicted to occur, based on the number of bytes in the queue.
+ * @param connectionId
*/
- int getNextIntervalCount();
+ int getNextIntervalCount(String connectionId);
}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index 5f67b49..450d944 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -16,6 +16,39 @@
*/
package org.apache.nifi.controller;
+import static java.util.Objects.requireNonNull;
+
+import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.lang.management.GarbageCollectorMXBean;
+import java.lang.management.ManagementFactory;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
+
+import javax.net.ssl.SSLContext;
+
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.admin.service.AuditService;
import org.apache.nifi.annotation.lifecycle.OnConfigurationRestored;
@@ -184,38 +217,6 @@ import org.apache.zookeeper.server.quorum.QuorumPeerConfig.ConfigException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.net.ssl.SSLContext;
-import java.io.ByteArrayInputStream;
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.lang.management.GarbageCollectorMXBean;
-import java.lang.management.ManagementFactory;
-import java.net.InetSocketAddress;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.stream.Collectors;
-
-import static java.util.Objects.requireNonNull;
-
public class FlowController implements ReportingTaskProvider, Authorizable, NodeTypeProvider {
// default repository implementations
@@ -350,7 +351,7 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node
// guarded by rwLock
private NodeConnectionStatus connectionStatus;
- private CachingStatusAnalyticEngine analyticsEngine;
+ private StatusAnalytics analyticsEngine;
// guarded by rwLock
private String instanceId;
@@ -592,8 +593,10 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node
zooKeeperStateServer = null;
}
- eventAccess = new StandardEventAccess(this, flowFileEventRepository);
componentStatusRepository = createComponentStatusRepository();
+ analyticsEngine = new CachingStatusAnalyticEngine(this, componentStatusRepository);
+ eventAccess = new StandardEventAccess(this, flowFileEventRepository);
+
timerDrivenEngineRef.get().scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
@@ -605,9 +608,6 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node
}
}, snapshotMillis, snapshotMillis, TimeUnit.MILLISECONDS);
- analyticsEngine = new CachingStatusAnalyticEngine(this, componentStatusRepository);
-
-
this.connectionStatus = new NodeConnectionStatus(nodeId, DisconnectionCode.NOT_YET_CONNECTED);
heartbeatBeanRef.set(new HeartbeatBean(rootGroup, false));
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/CachingStatusAnalyticEngine.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/CachingStatusAnalyticEngine.java
index 5241c4a..864a5d4 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/CachingStatusAnalyticEngine.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/CachingStatusAnalyticEngine.java
@@ -2,6 +2,7 @@ package org.apache.nifi.controller.status.analytics;
import java.util.Date;
import java.util.List;
+import java.util.concurrent.TimeUnit;
import org.apache.commons.math3.stat.regression.SimpleRegression;
import org.apache.nifi.connectable.Connection;
@@ -28,6 +29,7 @@ public class CachingStatusAnalyticEngine implements StatusAnalytics {
this.controller = controller;
this.statusRepository = statusRepository;
this.cache = Caffeine.newBuilder()
+ .expireAfterWrite(1,TimeUnit.MINUTES)
.build();
}
@@ -38,15 +40,37 @@ public class CachingStatusAnalyticEngine implements StatusAnalytics {
Connection connection = rootGroup.findConnection(connectionId);
SimpleRegression cachedRegression = cache.getIfPresent(connection.getIdentifier());
- if(cachedRegression != null) {
- cache.put(connection.getIdentifier(), cachedRegression);
+ if(cachedRegression == null) {
+ cachedRegression = getBackPressureRegressionModel(connection);
+ if(cachedRegression != null)
+ cache.put(connection.getIdentifier(), cachedRegression);
}
ConnectionStatusAnalytics cachedResult = calculate(cachedRegression,connection);
- LOG.info("Connection: " + connectionId + " Cached backpressure Time: " + cachedResult.getMinTimeToBackpressureMillis() );
+ LOG.info("Connection: " + connectionId + " Cached backpressure Time: " + cachedResult.getTimeToCountBackpressureMillis());
return cachedResult;
}
+ @Override
+ public long getTimeToBytesBackpressureMillis(String connectionId) {
+ return 0;
+ }
+
+ @Override
+ public long getTimeToCountBackpressureMillis(String connectionId) {
+ return getConnectionStatusAnalytics(connectionId).getTimeToCountBackpressureMillis();
+ }
+
+ @Override
+ public long getNextIntervalBytes(String connectionId) {
+ return 0;
+ }
+
+ @Override
+ public int getNextIntervalCount(String connectionId) {
+ return 0;
+ }
+
protected ConnectionStatusAnalytics calculate(SimpleRegression regression, Connection conn){
long backPressureObjectThreshold = conn.getFlowFileQueue().getBackPressureObjectThreshold();
@@ -77,14 +101,30 @@ public class CachingStatusAnalyticEngine implements StatusAnalytics {
return conn.getName();
}
+
+ @Override
+ public String getId() {
+ return conn.getIdentifier();
+ }
+
@Override
- public long getMinTimeToBackpressureMillis() {
+ public long getTimeToBytesBackpressureMillis() {
+ return 0;
+ }
+
+ @Override
+ public long getTimeToCountBackpressureMillis() {
return connTimeToBackpressure;
}
@Override
- public String getId() {
- return conn.getIdentifier();
+ public long getNextIntervalBytes() {
+ return 0;
+ }
+
+ @Override
+ public int getNextIntervalCount() {
+ return 0;
}
@Override
@@ -111,7 +151,7 @@ public class CachingStatusAnalyticEngine implements StatusAnalytics {
* @return
*/
protected SimpleRegression getBackPressureRegressionModel(Connection conn) {
- Date minDate = new Date(System.currentTimeMillis() - (5 * 60 * 1000));
+ Date minDate = new Date(System.currentTimeMillis() - (60 * 1000));
StatusHistoryDTO connHistory = StatusHistoryUtil.createStatusHistoryDTO(
statusRepository.getConnectionStatusHistory(conn.getIdentifier(), minDate, null, Integer.MAX_VALUE));
List<StatusSnapshotDTO> aggregateSnapshots = connHistory.getAggregateSnapshots();
@@ -128,7 +168,6 @@ public class CachingStatusAnalyticEngine implements StatusAnalytics {
Long snapQueuedCount = snap.getStatusMetrics().get(ConnectionStatusDescriptor.QUEUED_COUNT.getField());
long snapTime = snap.getTimestamp().getTime();
regression.addData(snapTime, snapQueuedCount);
- LOG.info("Connection " + conn.getIdentifier() + " statistics: ("+snapTime+","+snapQueuedCount+")");
}
if (regression.getSlope() <= 0 && !conn.getFlowFileQueue().isFull()) {
@@ -141,20 +180,5 @@ public class CachingStatusAnalyticEngine implements StatusAnalytics {
}
- public void refreshModel() {
- ProcessGroup rootGroup = controller.getFlowManager().getRootGroup();
- List<Connection> allConnections = rootGroup.findAllConnections();
- cache.invalidateAll();
- for (Connection conn : allConnections) {
- SimpleRegression regression = getBackPressureRegressionModel(conn);
- if(regression != null) {
- cache.put(conn.getIdentifier(), regression);
- }
- }
- }
- @Override
- public long getMinTimeToBackpressureMillis() {
- return 0;
- }
}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalyticEngine.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalyticEngine.java
index 024c138..5a873d5 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalyticEngine.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalyticEngine.java
@@ -172,22 +172,22 @@ public class StatusAnalyticEngine implements StatusAnalytics {
// TODO - populate the prediction fields. Do we need to pass in connection ID?
@Override
- public long getTimeToCountBackpressureMillis() {
+ public long getTimeToCountBackpressureMillis(String connectionId) {
return 0;
}
@Override
- public long getTimeToBytesBackpressureMillis() {
+ public long getTimeToBytesBackpressureMillis(String connectionId) {
return 0;
}
@Override
- public long getNextIntervalBytes() {
+ public long getNextIntervalBytes(String connectionId) {
return 0;
}
@Override
- public int getNextIntervalCount() {
+ public int getNextIntervalCount(String connectionId) {
return 0;
}
}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/reporting/StandardEventAccess.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/reporting/StandardEventAccess.java
index f943856..87fcd4d 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/reporting/StandardEventAccess.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/reporting/StandardEventAccess.java
@@ -65,10 +65,12 @@ import org.apache.nifi.remote.RemoteGroupPort;
public class StandardEventAccess implements UserAwareEventAccess {
private final FlowFileEventRepository flowFileEventRepository;
private final FlowController flowController;
+ private final StatusAnalytics statusAnalytics;
public StandardEventAccess(final FlowController flowController, final FlowFileEventRepository flowFileEventRepository) {
this.flowController = flowController;
this.flowFileEventRepository = flowFileEventRepository;
+ this.statusAnalytics = flowController.getStatusAnalytics();
}
/**
@@ -339,12 +341,11 @@ public class StandardEventAccess implements UserAwareEventAccess {
bytesTransferred += connectionStatusReport.getContentSizeIn() + connectionStatusReport.getContentSizeOut();
}
- final StatusAnalytics statusAnalytics = flowController.getStatusAnalytics();
if (statusAnalytics != null) {
- connStatus.setPredictedTimeToBytesBackpressureMillis(statusAnalytics.getTimeToBytesBackpressureMillis());
- connStatus.setPredictedTimeToCountBackpressureMillis(statusAnalytics.getTimeToCountBackpressureMillis());
- connStatus.setNextPredictedQueuedBytes(statusAnalytics.getNextIntervalBytes());
- connStatus.setNextPredictedQueuedCount(statusAnalytics.getNextIntervalCount());
+ connStatus.setPredictedTimeToBytesBackpressureMillis(statusAnalytics.getTimeToBytesBackpressureMillis(conn.getIdentifier()));
+ connStatus.setPredictedTimeToCountBackpressureMillis(statusAnalytics.getTimeToCountBackpressureMillis(conn.getIdentifier()));
+ connStatus.setNextPredictedQueuedBytes(statusAnalytics.getNextIntervalBytes(conn.getIdentifier()));
+ connStatus.setNextPredictedQueuedCount(statusAnalytics.getNextIntervalCount(conn.getIdentifier()));
}
if (isConnectionAuthorized) {