You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ai...@apache.org on 2019/08/19 13:53:52 UTC
[nifi] 16/21: NIFI-6150 Added tests for connection status analytics
class, corrected variable names
This is an automated email from the ASF dual-hosted git repository.
aichrist pushed a commit to branch analytics-framework
in repository https://gitbox.apache.org/repos/asf/nifi.git
commit ecce2794a174c025df45c67746d6b56e99ba2f1b
Author: Yolanda Davis <yo...@gmail.com>
AuthorDate: Wed Jul 31 13:31:24 2019 -0400
NIFI-6150 Added tests for connection status analytics class, corrected variable names
(cherry picked from commit 58c7c81)
---
.../analytics/ConnectionStatusAnalytics.java | 6 +-
.../analytics/TestConnectionStatusAnalytics.java | 159 +++++++++++++++++++++
2 files changed, 162 insertions(+), 3 deletions(-)
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/ConnectionStatusAnalytics.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/ConnectionStatusAnalytics.java
index 8b7964e..6b831fa 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/ConnectionStatusAnalytics.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/ConnectionStatusAnalytics.java
@@ -250,7 +250,7 @@ public class ConnectionStatusAnalytics implements StatusAnalytics {
private final ExtractFunction extract = (metric, statusHistory) -> {
- List<Double> counts = new ArrayList<>();
+ List<Double> values = new ArrayList<>();
List<Double> times = new ArrayList<>();
StatusHistoryDTO statusHistoryDTO = StatusHistoryUtil.createStatusHistoryDTO(statusHistory);
@@ -258,10 +258,10 @@ public class ConnectionStatusAnalytics implements StatusAnalytics {
for (StatusSnapshotDTO snap : statusHistoryDTO.getAggregateSnapshots()) {
Long snapValue = snap.getStatusMetrics().get(metric);
long snapTime = snap.getTimestamp().getTime();
- counts.add((double) snapValue);
+ values.add((double) snapValue);
times.add((double) snapTime);
}
- return new Tuple<>(times.stream(), counts.stream());
+ return new Tuple<>(times.stream(), values.stream());
};
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/status/analytics/TestConnectionStatusAnalytics.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/status/analytics/TestConnectionStatusAnalytics.java
new file mode 100644
index 0000000..e17ae9a
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/status/analytics/TestConnectionStatusAnalytics.java
@@ -0,0 +1,159 @@
+package org.apache.nifi.controller.status.analytics;
+
+import static org.junit.Assert.assertNotNull;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.when;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import org.apache.nifi.connectable.Connection;
+import org.apache.nifi.controller.flow.FlowManager;
+import org.apache.nifi.controller.queue.FlowFileQueue;
+import org.apache.nifi.controller.status.history.ComponentStatusRepository;
+import org.apache.nifi.controller.status.history.ConnectionStatusDescriptor;
+import org.apache.nifi.controller.status.history.MetricDescriptor;
+import org.apache.nifi.controller.status.history.StandardStatusSnapshot;
+import org.apache.nifi.controller.status.history.StatusHistory;
+import org.apache.nifi.controller.status.history.StatusSnapshot;
+import org.apache.nifi.groups.ProcessGroup;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class TestConnectionStatusAnalytics {
+
+ private static final Set<MetricDescriptor<?>> CONNECTION_METRICS = Arrays.stream(ConnectionStatusDescriptor.values())
+ .map(ConnectionStatusDescriptor::getDescriptor)
+ .collect(Collectors.toSet());
+
+ protected ConnectionStatusAnalytics getConnectionStatusAnalytics(Long queuedBytes, Long queuedCount,String backPressureDataSizeThreshhold, Long backPressureObjectThreshold, Boolean isConstantStatus){
+ ComponentStatusRepository statusRepository = Mockito.mock(ComponentStatusRepository.class);
+ FlowManager flowManager;flowManager = Mockito.mock(FlowManager.class);
+ final ProcessGroup processGroup = Mockito.mock(ProcessGroup.class);
+ final StatusHistory statusHistory = Mockito.mock(StatusHistory.class);
+ final Connection connection = Mockito.mock(Connection.class);
+ final FlowFileQueue flowFileQueue = Mockito.mock(FlowFileQueue.class);
+ final List<Connection> connections = new ArrayList<>();
+ final String connectionIdentifier = "1";
+ connections.add(connection);
+
+ List<StatusSnapshot> snapshotList = new ArrayList<>();
+ final long startTime = System.currentTimeMillis();
+ int iterations = 10;
+
+ for (int i=0; i < iterations; i++) {
+ final StandardStatusSnapshot snapshot = new StandardStatusSnapshot(CONNECTION_METRICS);
+ snapshot.setTimestamp(new Date(startTime + i * 1000));
+ snapshot.addStatusMetric(ConnectionStatusDescriptor.QUEUED_BYTES.getDescriptor(), (isConstantStatus || i < 5) ? queuedBytes: queuedBytes * 2);
+ snapshot.addStatusMetric(ConnectionStatusDescriptor.QUEUED_COUNT.getDescriptor(), (isConstantStatus || i < 5) ? queuedCount: queuedCount * 2);
+ snapshotList.add(snapshot);
+ }
+
+ when(flowFileQueue.getBackPressureDataSizeThreshold()).thenReturn(backPressureDataSizeThreshhold);
+ when(flowFileQueue.getBackPressureObjectThreshold()).thenReturn(backPressureObjectThreshold);
+ when(connection.getIdentifier()).thenReturn(connectionIdentifier);
+ when(connection.getFlowFileQueue()).thenReturn(flowFileQueue);
+ when(processGroup.findAllConnections()).thenReturn(connections);
+ when(statusHistory.getStatusSnapshots()).thenReturn(snapshotList);
+ when(flowManager.getRootGroup()).thenReturn(processGroup);
+ when(statusRepository.getConnectionStatusHistory(anyString(),any(),any(),anyInt())).thenReturn(statusHistory);
+ ConnectionStatusAnalytics connectionStatusAnalytics = new ConnectionStatusAnalytics(statusRepository,flowManager,connectionIdentifier,false);
+ connectionStatusAnalytics.init();
+ return connectionStatusAnalytics;
+ }
+
+ @Test
+ public void testGetIntervalTimeMillis(){
+ ConnectionStatusAnalytics connectionStatusAnalytics = getConnectionStatusAnalytics(5000L, 50L, "100MB", 100L, true);
+ Long interval = connectionStatusAnalytics.getIntervalTimeMillis();
+ assertNotNull(interval);
+ assert(interval == 300000);
+ }
+
+ @Test
+ public void testGetTimeToCountBackpressureMillisConstantStatus(){
+ ConnectionStatusAnalytics connectionStatusAnalytics = getConnectionStatusAnalytics(5000L, 50L, "100MB", 100L, true);
+ Long countTime = connectionStatusAnalytics.getTimeToCountBackpressureMillis();
+ assertNotNull(countTime);
+ assert(countTime == -1L);
+ }
+
+ @Test
+ public void testGetTimeToCountBackpressureMillisVaryingStatus(){
+ ConnectionStatusAnalytics connectionStatusAnalytics = getConnectionStatusAnalytics(5000L, 50L, "100MB", 100L, false);
+ Long countTime = connectionStatusAnalytics.getTimeToCountBackpressureMillis();
+ assertNotNull(countTime);
+ assert(countTime > -1L);
+ }
+
+ @Test
+ public void testGetTimeToBytesBackpressureMillisConstantStatus(){
+ ConnectionStatusAnalytics connectionStatusAnalytics = getConnectionStatusAnalytics(5000L, 50L, "100MB", 100L, true);
+ Long bytesTime = connectionStatusAnalytics.getTimeToBytesBackpressureMillis();
+ assertNotNull(bytesTime);
+ assert(bytesTime == -1L);
+ }
+
+ @Test
+ public void testGetTimeToBytesBackpressureMillisVaryingStatus(){
+ ConnectionStatusAnalytics connectionStatusAnalytics = getConnectionStatusAnalytics(5000L, 50L, "100MB", 100L, false);
+ Long bytesTime = connectionStatusAnalytics.getTimeToBytesBackpressureMillis();
+ assertNotNull(bytesTime);
+ assert(bytesTime > -1L);
+ }
+
+ @Test
+ public void testGetNextIntervalBytesConstantStatus(){
+ ConnectionStatusAnalytics connectionStatusAnalytics = getConnectionStatusAnalytics(5000L, 50L, "100MB", 100L, true);
+ Long nextBytes = connectionStatusAnalytics.getNextIntervalBytes();
+ assertNotNull(nextBytes);
+ assert(nextBytes == 5000L);
+ }
+
+ @Test
+ public void testGetNextIntervalBytesVaryingStatus(){
+ ConnectionStatusAnalytics connectionStatusAnalytics = getConnectionStatusAnalytics(5000L, 50L, "100MB", 100L, false);
+ Long nextBytes = connectionStatusAnalytics.getNextIntervalBytes();
+ assertNotNull(nextBytes);
+ assert(nextBytes > -1L);
+ }
+
+ @Test
+ public void testGetNextIntervalCountConstantStatus(){
+ ConnectionStatusAnalytics connectionStatusAnalytics = getConnectionStatusAnalytics(5000L, 50L, "100MB", 100L, true);
+ Long nextCount = connectionStatusAnalytics.getNextIntervalCount();
+ assertNotNull(nextCount);
+ assert(nextCount == 50L);
+ }
+
+ @Test
+ public void testGetNextIntervalCountVaryingStatus(){
+ ConnectionStatusAnalytics connectionStatusAnalytics = getConnectionStatusAnalytics(5000L, 50L, "100MB", 100L, true);
+ Long nextCount = connectionStatusAnalytics.getNextIntervalCount();
+ assertNotNull(nextCount);
+ assert(nextCount == 50L);
+ }
+
+ @Test
+ public void testGetNextIntervalPercentageUseBytesConstantStatus(){
+ ConnectionStatusAnalytics connectionStatusAnalytics = getConnectionStatusAnalytics(50000L, 50L, "1MB", 100L, true);
+ Long nextBytesPercentage = connectionStatusAnalytics.getNextIntervalPercentageUseBytes();
+ assertNotNull(nextBytesPercentage);
+ assert(nextBytesPercentage == 5);
+ }
+
+ @Test
+ public void testGetNextIntervalPercentageUseCountConstantStatus(){
+ ConnectionStatusAnalytics connectionStatusAnalytics = getConnectionStatusAnalytics(5000L, 50L, "100MB", 100L, true);
+ Long nextCountPercentage = connectionStatusAnalytics.getNextIntervalPercentageUseCount();
+ assertNotNull(nextCountPercentage);
+ assert(nextCountPercentage == 50);
+ }
+
+}