You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ai...@apache.org on 2019/08/19 13:53:52 UTC

[nifi] 16/21: NIFI-6150 Added tests for connection status analytics class, corrected variable names

This is an automated email from the ASF dual-hosted git repository.

aichrist pushed a commit to branch analytics-framework
in repository https://gitbox.apache.org/repos/asf/nifi.git

commit ecce2794a174c025df45c67746d6b56e99ba2f1b
Author: Yolanda Davis <yo...@gmail.com>
AuthorDate: Wed Jul 31 13:31:24 2019 -0400

    NIFI-6150 Added tests for connection status analytics class, corrected variable names
    
    (cherry picked from commit 58c7c81)
---
 .../analytics/ConnectionStatusAnalytics.java       |   6 +-
 .../analytics/TestConnectionStatusAnalytics.java   | 159 +++++++++++++++++++++
 2 files changed, 162 insertions(+), 3 deletions(-)

diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/ConnectionStatusAnalytics.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/ConnectionStatusAnalytics.java
index 8b7964e..6b831fa 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/ConnectionStatusAnalytics.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/ConnectionStatusAnalytics.java
@@ -250,7 +250,7 @@ public class ConnectionStatusAnalytics implements StatusAnalytics {
 
     private final ExtractFunction extract = (metric, statusHistory) -> {
 
-        List<Double> counts = new ArrayList<>();
+        List<Double> values = new ArrayList<>();
         List<Double> times = new ArrayList<>();
 
         StatusHistoryDTO statusHistoryDTO = StatusHistoryUtil.createStatusHistoryDTO(statusHistory);
@@ -258,10 +258,10 @@ public class ConnectionStatusAnalytics implements StatusAnalytics {
         for (StatusSnapshotDTO snap : statusHistoryDTO.getAggregateSnapshots()) {
             Long snapValue = snap.getStatusMetrics().get(metric);
             long snapTime = snap.getTimestamp().getTime();
-            counts.add((double) snapValue);
+            values.add((double) snapValue);
             times.add((double) snapTime);
         }
-        return new Tuple<>(times.stream(), counts.stream());
+        return new Tuple<>(times.stream(), values.stream());
 
     };
 
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/status/analytics/TestConnectionStatusAnalytics.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/status/analytics/TestConnectionStatusAnalytics.java
new file mode 100644
index 0000000..e17ae9a
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/status/analytics/TestConnectionStatusAnalytics.java
@@ -0,0 +1,159 @@
+package org.apache.nifi.controller.status.analytics;
+
+import static org.junit.Assert.assertNotNull;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.when;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import org.apache.nifi.connectable.Connection;
+import org.apache.nifi.controller.flow.FlowManager;
+import org.apache.nifi.controller.queue.FlowFileQueue;
+import org.apache.nifi.controller.status.history.ComponentStatusRepository;
+import org.apache.nifi.controller.status.history.ConnectionStatusDescriptor;
+import org.apache.nifi.controller.status.history.MetricDescriptor;
+import org.apache.nifi.controller.status.history.StandardStatusSnapshot;
+import org.apache.nifi.controller.status.history.StatusHistory;
+import org.apache.nifi.controller.status.history.StatusSnapshot;
+import org.apache.nifi.groups.ProcessGroup;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class TestConnectionStatusAnalytics {
+
+    private static final Set<MetricDescriptor<?>> CONNECTION_METRICS = Arrays.stream(ConnectionStatusDescriptor.values())
+            .map(ConnectionStatusDescriptor::getDescriptor)
+            .collect(Collectors.toSet());
+
+    protected ConnectionStatusAnalytics getConnectionStatusAnalytics(Long queuedBytes, Long queuedCount,String backPressureDataSizeThreshhold, Long backPressureObjectThreshold, Boolean isConstantStatus){
+        ComponentStatusRepository statusRepository = Mockito.mock(ComponentStatusRepository.class);
+        FlowManager flowManager;flowManager = Mockito.mock(FlowManager.class);
+        final ProcessGroup processGroup = Mockito.mock(ProcessGroup.class);
+        final StatusHistory statusHistory = Mockito.mock(StatusHistory.class);
+        final Connection connection = Mockito.mock(Connection.class);
+        final FlowFileQueue flowFileQueue = Mockito.mock(FlowFileQueue.class);
+        final List<Connection> connections = new ArrayList<>();
+        final String connectionIdentifier = "1";
+        connections.add(connection);
+
+        List<StatusSnapshot> snapshotList = new ArrayList<>();
+        final long startTime = System.currentTimeMillis();
+        int iterations = 10;
+
+        for (int i=0; i < iterations; i++) {
+            final StandardStatusSnapshot snapshot = new StandardStatusSnapshot(CONNECTION_METRICS);
+            snapshot.setTimestamp(new Date(startTime + i * 1000));
+            snapshot.addStatusMetric(ConnectionStatusDescriptor.QUEUED_BYTES.getDescriptor(), (isConstantStatus || i < 5) ? queuedBytes: queuedBytes * 2);
+            snapshot.addStatusMetric(ConnectionStatusDescriptor.QUEUED_COUNT.getDescriptor(), (isConstantStatus || i < 5) ? queuedCount: queuedCount * 2);
+            snapshotList.add(snapshot);
+        }
+
+        when(flowFileQueue.getBackPressureDataSizeThreshold()).thenReturn(backPressureDataSizeThreshhold);
+        when(flowFileQueue.getBackPressureObjectThreshold()).thenReturn(backPressureObjectThreshold);
+        when(connection.getIdentifier()).thenReturn(connectionIdentifier);
+        when(connection.getFlowFileQueue()).thenReturn(flowFileQueue);
+        when(processGroup.findAllConnections()).thenReturn(connections);
+        when(statusHistory.getStatusSnapshots()).thenReturn(snapshotList);
+        when(flowManager.getRootGroup()).thenReturn(processGroup);
+        when(statusRepository.getConnectionStatusHistory(anyString(),any(),any(),anyInt())).thenReturn(statusHistory);
+        ConnectionStatusAnalytics connectionStatusAnalytics = new ConnectionStatusAnalytics(statusRepository,flowManager,connectionIdentifier,false);
+        connectionStatusAnalytics.init();
+        return connectionStatusAnalytics;
+    }
+
+    @Test
+    public void testGetIntervalTimeMillis(){
+        ConnectionStatusAnalytics connectionStatusAnalytics = getConnectionStatusAnalytics(5000L, 50L, "100MB", 100L, true);
+        Long interval = connectionStatusAnalytics.getIntervalTimeMillis();
+        assertNotNull(interval);
+        assert(interval == 300000);
+    }
+
+    @Test
+    public void testGetTimeToCountBackpressureMillisConstantStatus(){
+        ConnectionStatusAnalytics connectionStatusAnalytics = getConnectionStatusAnalytics(5000L, 50L, "100MB", 100L, true);
+        Long countTime = connectionStatusAnalytics.getTimeToCountBackpressureMillis();
+        assertNotNull(countTime);
+        assert(countTime == -1L);
+    }
+
+    @Test
+    public void testGetTimeToCountBackpressureMillisVaryingStatus(){
+        ConnectionStatusAnalytics connectionStatusAnalytics = getConnectionStatusAnalytics(5000L, 50L, "100MB", 100L, false);
+        Long countTime = connectionStatusAnalytics.getTimeToCountBackpressureMillis();
+        assertNotNull(countTime);
+        assert(countTime > -1L);
+    }
+
+    @Test
+    public void testGetTimeToBytesBackpressureMillisConstantStatus(){
+        ConnectionStatusAnalytics connectionStatusAnalytics = getConnectionStatusAnalytics(5000L, 50L, "100MB", 100L, true);
+        Long bytesTime = connectionStatusAnalytics.getTimeToBytesBackpressureMillis();
+        assertNotNull(bytesTime);
+        assert(bytesTime == -1L);
+    }
+
+    @Test
+    public void testGetTimeToBytesBackpressureMillisVaryingStatus(){
+        ConnectionStatusAnalytics connectionStatusAnalytics = getConnectionStatusAnalytics(5000L, 50L, "100MB", 100L, false);
+        Long bytesTime = connectionStatusAnalytics.getTimeToBytesBackpressureMillis();
+        assertNotNull(bytesTime);
+        assert(bytesTime > -1L);
+    }
+
+    @Test
+    public void testGetNextIntervalBytesConstantStatus(){
+        ConnectionStatusAnalytics connectionStatusAnalytics = getConnectionStatusAnalytics(5000L, 50L, "100MB", 100L, true);
+        Long nextBytes = connectionStatusAnalytics.getNextIntervalBytes();
+        assertNotNull(nextBytes);
+        assert(nextBytes == 5000L);
+    }
+
+    @Test
+    public void testGetNextIntervalBytesVaryingStatus(){
+        ConnectionStatusAnalytics connectionStatusAnalytics = getConnectionStatusAnalytics(5000L, 50L, "100MB", 100L, false);
+        Long nextBytes = connectionStatusAnalytics.getNextIntervalBytes();
+        assertNotNull(nextBytes);
+        assert(nextBytes > -1L);
+    }
+
+    @Test
+    public void testGetNextIntervalCountConstantStatus(){
+        ConnectionStatusAnalytics connectionStatusAnalytics = getConnectionStatusAnalytics(5000L, 50L, "100MB", 100L, true);
+        Long nextCount = connectionStatusAnalytics.getNextIntervalCount();
+        assertNotNull(nextCount);
+        assert(nextCount == 50L);
+    }
+
+    @Test
+    public void testGetNextIntervalCountVaryingStatus(){
+        ConnectionStatusAnalytics connectionStatusAnalytics = getConnectionStatusAnalytics(5000L, 50L, "100MB", 100L, true);
+        Long nextCount = connectionStatusAnalytics.getNextIntervalCount();
+        assertNotNull(nextCount);
+        assert(nextCount == 50L);
+    }
+
+    @Test
+    public void testGetNextIntervalPercentageUseBytesConstantStatus(){
+        ConnectionStatusAnalytics connectionStatusAnalytics = getConnectionStatusAnalytics(50000L, 50L, "1MB", 100L, true);
+        Long nextBytesPercentage = connectionStatusAnalytics.getNextIntervalPercentageUseBytes();
+        assertNotNull(nextBytesPercentage);
+        assert(nextBytesPercentage == 5);
+    }
+
+    @Test
+    public void testGetNextIntervalPercentageUseCountConstantStatus(){
+        ConnectionStatusAnalytics connectionStatusAnalytics = getConnectionStatusAnalytics(5000L, 50L, "100MB", 100L, true);
+        Long nextCountPercentage = connectionStatusAnalytics.getNextIntervalPercentageUseCount();
+        assertNotNull(nextCountPercentage);
+        assert(nextCountPercentage == 50);
+    }
+
+}