You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2019/07/26 15:27:38 UTC

[nifi] branch master updated: NIFI-6433 Update getStatusHistory to honor filter values.

This is an automated email from the ASF dual-hosted git repository.

markap14 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/master by this push:
     new d1c30d1  NIFI-6433 Update getStatusHistory to honor filter values.
d1c30d1 is described below

commit d1c30d14777354de2288b67184ffe9e1ceb98cf1
Author: Mark Owens <jm...@apache.org>
AuthorDate: Thu Jul 11 15:26:41 2019 -0400

    NIFI-6433 Update getStatusHistory to honor filter values.
    
    This modification updates the getStatusHistory method of
    VolatileComponentStatusRepository to utilize the start, end, and
    preferredDataPoints parameters when retrieving status histories.
    
    When calling the various get<XXXX>StatusHistory methods of
    VolatileComponentStatusRepository, they are all passed start, end,
    and preferredDataPoints values to allow the filtering of histories
    according to the provided parameters. But the follow-on method calls ignore
    those values completely. This ticket updates those methods to honor the
    parameters and filter the returned Dates accordingly.
    
    This closes #3579.
    
    Signed-off-by: Mark Payne <ma...@hotmail.com>
---
 .../history/VolatileComponentStatusRepository.java |  39 +++-
 .../VolatileComponentStatusRepositoryTest.java     | 230 +++++++++++++++++++++
 2 files changed, 261 insertions(+), 8 deletions(-)

diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/VolatileComponentStatusRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/VolatileComponentStatusRepository.java
index b280214..6c95843 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/VolatileComponentStatusRepository.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/VolatileComponentStatusRepository.java
@@ -57,7 +57,8 @@ public class VolatileComponentStatusRepository implements ComponentStatusReposit
 
     private final Map<String, ComponentStatusHistory> componentStatusHistories = new HashMap<>();
 
-    private final RingBuffer<Date> timestamps;
+    // Changed to protected to allow unit testing
+    protected final RingBuffer<Date> timestamps;
     private final RingBuffer<List<GarbageCollectionStatus>> gcStatuses;
     private final int numDataPoints;
     private volatile long lastCaptureTime = 0L;
@@ -145,35 +146,57 @@ public class VolatileComponentStatusRepository implements ComponentStatusReposit
 
     @Override
     public StatusHistory getProcessorStatusHistory(final String processorId, final Date start, final Date end, final int preferredDataPoints, final boolean includeCounters) {
-        return getStatusHistory(processorId, includeCounters, DEFAULT_PROCESSOR_METRICS);
+        return getStatusHistory(processorId, includeCounters, DEFAULT_PROCESSOR_METRICS, start, end, preferredDataPoints);
     }
 
     @Override
     public StatusHistory getConnectionStatusHistory(final String connectionId, final Date start, final Date end, final int preferredDataPoints) {
-        return getStatusHistory(connectionId, true, DEFAULT_CONNECTION_METRICS);
+        return getStatusHistory(connectionId, true, DEFAULT_CONNECTION_METRICS, start, end, preferredDataPoints);
     }
 
     @Override
     public StatusHistory getProcessGroupStatusHistory(final String processGroupId, final Date start, final Date end, final int preferredDataPoints) {
-        return getStatusHistory(processGroupId, true, DEFAULT_GROUP_METRICS);
+        return getStatusHistory(processGroupId, true, DEFAULT_GROUP_METRICS, start, end, preferredDataPoints);
     }
 
     @Override
     public StatusHistory getRemoteProcessGroupStatusHistory(final String remoteGroupId, final Date start, final Date end, final int preferredDataPoints) {
-        return getStatusHistory(remoteGroupId, true, DEFAULT_RPG_METRICS);
+        return getStatusHistory(remoteGroupId, true, DEFAULT_RPG_METRICS, start, end, preferredDataPoints);
     }
 
 
-    private synchronized StatusHistory getStatusHistory(final String componentId, final boolean includeCounters, final Set<MetricDescriptor<?>> defaultMetricDescriptors) {
+    // Updated getStatusHistory to utilize the start/end/preferredDataPoints parameters passed into
+    // the calling methods. Although for VolatileComponentStatusRepository the timestamps buffer is
+    // rather small it still seemed better that the parameters should be honored rather than
+    // silently ignored.
+    private synchronized StatusHistory getStatusHistory(final String componentId,
+        final boolean includeCounters, final Set<MetricDescriptor<?>> defaultMetricDescriptors,
+        final Date start, final Date end, final int preferredDataPoints) {
         final ComponentStatusHistory history = componentStatusHistories.get(componentId);
         if (history == null) {
             return createEmptyStatusHistory();
         }
-
-        final List<Date> dates = timestamps.asList();
+        final List<Date> dates = filterDates(start, end, preferredDataPoints);
         return history.toStatusHistory(dates, includeCounters, defaultMetricDescriptors);
     }
 
+    // Given a buffer, return a list of Dates based on start/end/preferredDataPoints
+    protected List<Date> filterDates(final Date start, final Date end, final int preferredDataPoints) {
+        Date startDate = (start == null) ? new Date(0L) : start;
+        Date endDate = (end == null) ? new Date() : end;
+
+        // Limit date information to a subset based upon input parameters
+        List<Date> filteredDates =
+            timestamps.asList()
+                .stream()
+                .filter(p -> (p.after(startDate) || p.equals(startDate))
+                    && (p.before(endDate) || p.equals(endDate))).collect(Collectors.toList());
+
+        // if preferredDataPoints != Integer.MAX_VALUE, Dates returned will be reduced further
+        return filteredDates.subList(Math.max(filteredDates.size() - preferredDataPoints, 0), filteredDates.size());
+    }
+
+
     private StatusHistory createEmptyStatusHistory() {
         final Date dateGenerated = new Date();
 
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/status/history/VolatileComponentStatusRepositoryTest.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/status/history/VolatileComponentStatusRepositoryTest.java
new file mode 100644
index 0000000..e7532ff
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/status/history/VolatileComponentStatusRepositoryTest.java
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.status.history;
+
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.util.Date;
+import java.util.List;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.nifi.util.NiFiProperties;
+
+import static junit.framework.TestCase.assertTrue;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.testng.AssertJUnit.assertEquals;
+
+/**
+ * This class verifies the VolatileComponentStatusRepository getConnectionStatusHistory method
+ * honors the start/end/preferredDataPoints variables by testing the filterDates function.
+ */
+public class VolatileComponentStatusRepositoryTest {
+
+  private static VolatileComponentStatusRepository repo1;
+  private static VolatileComponentStatusRepository repo2;
+  private static VolatileComponentStatusRepository repo3;
+  private static final int FIVE_MINUTES = 300000;
+  private static int BUFSIZE3 = 10;
+
+  @BeforeClass
+  public static void createBuffers() {
+    NiFiProperties props1 = mock(NiFiProperties.class);
+    int BUFSIZE1 = 1_000_000;
+    when(props1.getIntegerProperty(anyString(), anyInt())).thenReturn(BUFSIZE1);
+    repo1 = new VolatileComponentStatusRepository(props1);
+    // Fill the repo1 buffer completely with Date objects at five-minute intervals
+    // This provides dates up to around Jul 1979
+    for (long i = 0; i < BUFSIZE1; i++) {
+      repo1.timestamps.add(new Date(i * FIVE_MINUTES));
+    }
+    assertEquals(BUFSIZE1, repo1.timestamps.getSize());
+
+    NiFiProperties props2 = mock(NiFiProperties.class);
+    int BUFSIZE2 = 1000;
+    when(props2.getIntegerProperty(anyString(), anyInt())).thenReturn(BUFSIZE2);
+    repo2 = new VolatileComponentStatusRepository(props2);
+    int OFFSET = 10;
+    // Verify partially filled buffers work as expected.
+    for (long i = 0; i < BUFSIZE2 - OFFSET; i++) {
+      repo2.timestamps.add(new Date(i * FIVE_MINUTES));
+    }
+    assertEquals(BUFSIZE2 - OFFSET, repo2.timestamps.getSize());
+
+    NiFiProperties props3 = mock(NiFiProperties.class);
+    when(props3.getIntegerProperty(anyString(), anyInt())).thenReturn(BUFSIZE3);
+    repo3 = new VolatileComponentStatusRepository(props3);
+  }
+
+  private Date asDate(LocalDateTime localDateTime) {
+    return Date.from(localDateTime.atZone(ZoneId.systemDefault()).toInstant());
+  }
+
+  @Test
+  public void testFilterDatesReturnAll() {
+    List<Date> dates = repo1.filterDates(null, null, Integer.MAX_VALUE);
+    assert repo1.timestamps != null;
+    assertEquals(repo1.timestamps.getSize(), dates.size());
+    assertTrue(dates.equals(repo1.timestamps.asList()));
+    repo1.timestamps.add(new Date());
+
+    dates = repo2.filterDates(null, null, Integer.MAX_VALUE);
+    assertEquals(repo2.timestamps.getSize(), dates.size());
+    assertTrue(dates.equals(repo2.timestamps.asList()));
+    repo1.timestamps.add(new Date());
+  }
+
+  @Test
+  public void testFilterDatesUsingPreferredDataPoints() {
+    List<Date> dates = repo1.filterDates(null, null, 1);
+    assertEquals(1, dates.size());
+    assertEquals(repo1.timestamps.getNewestElement(), dates.get(0));
+
+    int numPoints = 14;
+    dates = repo1.filterDates(null, null, numPoints);
+    assertEquals(numPoints, dates.size());
+    assertEquals(repo1.timestamps.getNewestElement(), dates.get(dates.size()-1));
+    assertEquals(repo1.timestamps.asList().get(repo1.timestamps.getSize() - numPoints), dates.get(0));
+
+    numPoints = 22;
+    dates = repo2.filterDates(null, null, numPoints);
+    assertEquals(numPoints, dates.size());
+    assertEquals(repo2.timestamps.getNewestElement(), dates.get(dates.size()-1));
+    assertEquals(repo2.timestamps.asList().get(repo2.timestamps.getSize() - numPoints),
+        dates.get(0));
+  }
+
+  @Test
+  public void testFilterDatesUsingStartFilter() {
+    // Filter with date that exactly matches an entry in timestamps buffer
+    Date start = asDate(LocalDateTime.of(1978, 1, 1, 0, 45, 0));
+    List<Date> dates = repo1.filterDates(start, null, Integer.MAX_VALUE);
+    assertEquals(start, dates.get(0));
+    assertEquals(repo1.timestamps.getNewestElement(), dates.get(dates.size()-1));
+
+    // filter using a date that does not exactly match the time, i.e., not on a five-minute mark
+    start = asDate(LocalDateTime.of(1974, 1, 1, 3, 2, 0));
+    dates = repo1.filterDates(start, null, Integer.MAX_VALUE);
+    assertTrue(start.getTime() < dates.get(0).getTime());
+    assertTrue(dates.get(0).getTime() < (start.getTime() + FIVE_MINUTES));
+    assertEquals(repo1.timestamps.getNewestElement(), dates.get(dates.size()-1));
+
+    start = asDate(LocalDateTime.of(1970, 1, 1, 0, 0, 0));
+    dates = repo2.filterDates(start, null, Integer.MAX_VALUE);
+    assertEquals(start, dates.get(0));
+    assertEquals(repo2.timestamps.getNewestElement(), dates.get(dates.size()-1));
+  }
+
+  @Test
+  public void testFilterDatesUsingEndFilter() {
+    // Filter with date that exactly matches an entry in timestamps buffer
+    Date end = asDate(LocalDateTime.of(1970, 2, 1,1, 10, 0));
+    List<Date> dates = repo1.filterDates(null, end, Integer.MAX_VALUE);
+    assertEquals(end, dates.get(dates.size()-1));
+    assertEquals(repo1.timestamps.getOldestElement(), dates.get(0));
+
+    // filter using a date that does not exactly match the times in buffer
+    end = asDate(LocalDateTime.of(1970, 2, 1,1, 7, 0));
+    dates = repo1.filterDates(null, end, Integer.MAX_VALUE);
+    assertTrue(dates.get(dates.size()-1).getTime() < end.getTime());
+    assertTrue((end.getTime() - FIVE_MINUTES) < dates.get(dates.size()-1).getTime());
+    assertEquals(dates.get(0), repo1.timestamps.getOldestElement());
+
+    end = asDate(LocalDateTime.of(1970, 1, 2,1, 7, 0));
+    dates = repo2.filterDates(null, end, Integer.MAX_VALUE);
+    assertTrue(dates.get(dates.size()-1).getTime() < end.getTime());
+    assertTrue((end.getTime() - FIVE_MINUTES) < dates.get(dates.size()-1).getTime());
+    assertEquals(repo2.timestamps.asList().get(0), dates.get(0));
+  }
+
+  @Test
+  public void testFilterDatesUsingStartAndEndFilter() {
+    // Filter with dates that exactly matches entries in timestamps buffer
+    Date start = asDate(LocalDateTime.of(1975, 3, 1, 3, 15, 0));
+    Date end = asDate(LocalDateTime.of(1978, 4, 2,4, 25, 0));
+    List<Date> dates = repo1.filterDates(start, end, Integer.MAX_VALUE);
+    assertEquals(start, dates.get(0));
+    assertEquals(end, dates.get(dates.size()-1));
+
+    // Filter with dates that do not exactly matches entries in timestamps buffer
+    start = asDate(LocalDateTime.of(1975, 3, 1, 3, 3, 0));
+    end = asDate(LocalDateTime.of(1977, 4, 2,4, 8, 0));
+    dates = repo1.filterDates(start, end, Integer.MAX_VALUE);
+    assertTrue(start.getTime() < dates.get(0).getTime());
+    assertTrue(dates.get(0).getTime() < (start.getTime() + FIVE_MINUTES));
+    assertTrue(dates.get(dates.size()-1).getTime() < end.getTime());
+    assertTrue((end.getTime() - FIVE_MINUTES) < dates.get(dates.size()-1).getTime());
+
+    start = asDate(LocalDateTime.of(1970, 1, 1, 3, 15, 0));
+    end = asDate(LocalDateTime.of(1970, 1, 2,4, 25, 0));
+    dates = repo1.filterDates(start, end, Integer.MAX_VALUE);
+    assertEquals(start, dates.get(0));
+    assertEquals(end, dates.get(dates.size()-1));
+  }
+
+  @Test
+  public void testFilterDatesUsingStartEndAndPreferredFilter() {
+    // Filter with dates that exactly matches entries in timestamps buffer
+    int numPoints = 5;
+    Date start = asDate(LocalDateTime.of(1977, 1, 1, 0, 30, 0));
+    Date end = asDate(LocalDateTime.of(1977, 2, 1,1, 0, 0));
+    List<Date> dates = repo1.filterDates(start, end, numPoints);
+    assertEquals(numPoints, dates.size());
+    assertEquals(dates.get(dates.size()-1), end);
+    assertEquals(dates.get(dates.size()-numPoints), new Date(end.getTime() - (numPoints-1)*FIVE_MINUTES));
+
+    // Filter with dates that do not exactly matches entries in timestamps buffer
+    start = asDate(LocalDateTime.of(1975, 1, 1, 0, 31, 0));
+    end = asDate(LocalDateTime.of(1978, 2, 1,1, 59, 0));
+    dates = repo1.filterDates(start, end, numPoints);
+    assertTrue(dates.get(0).getTime() < new Date(end.getTime() - (numPoints-1)*FIVE_MINUTES).getTime());
+    assertTrue(new Date(end.getTime() - (numPoints * FIVE_MINUTES)).getTime() < dates.get(0).getTime());
+    assertTrue(dates.get(dates.size()-1).getTime() < end.getTime());
+    assertTrue((end.getTime() - FIVE_MINUTES) < dates.get(dates.size() - 1).getTime());
+    assertEquals(numPoints, dates.size());
+
+    start = asDate(LocalDateTime.of(1970, 1, 1, 0, 31, 0));
+    end = asDate(LocalDateTime.of(1970, 1, 1,1, 59, 0));
+    dates = repo2.filterDates(start, end, numPoints);
+    assertTrue(dates.get(0).getTime() < new Date(end.getTime() - (numPoints-1)*FIVE_MINUTES).getTime());
+    assertTrue(new Date(end.getTime() - (numPoints * FIVE_MINUTES)).getTime() < dates.get(0).getTime());
+    assertTrue(dates.get(dates.size()-1).getTime() < end.getTime());
+    assertTrue((end.getTime() - FIVE_MINUTES) < dates.get(dates.size() - 1).getTime());
+    assertEquals(numPoints, dates.size());
+  }
+
+  @Test
+  public void testFilterWorksWithCircularBuffer() {
+    // Fill repo3 with Date objects at five-minute intervals
+    // This repository is used to verify circular actions behave as expected.
+    for (int i = 0; i < 25; i++) {
+      repo3.timestamps.add(new Date(i * FIVE_MINUTES));
+      List<Date> dates = repo3.filterDates(null, null, Integer.MAX_VALUE);
+      if (i < BUFSIZE3 - 1) {
+        assertEquals(null, repo3.timestamps.getOldestElement());
+        assertEquals(repo3.timestamps.asList().get(0), dates.get(0));
+      } else {
+        assertEquals(repo3.timestamps.getOldestElement(), dates.get(0));
+      }
+      assertEquals(repo3.timestamps.getNewestElement(), dates.get(dates.size() - 1));
+    }
+  }
+}