You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jg...@apache.org on 2022/05/09 16:22:16 UTC

[nifi] branch main updated: NIFI-9993: Fixed bug in initialization in which the Content Repo did not properly increment the counter for how many files exist in the archive directories. This was causing the counter to become negative in some cases, which caused processors to incorrectly pause, waiting for content archive cleanup to occur when, in fact, there were no files archived

This is an automated email from the ASF dual-hosted git repository.

jgresock pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new b25201dd22 NIFI-9993: Fixed bug in initialization in which the Content Repo did not properly increment the counter for how many files exist in the archive directories. This was causing the counter to become negative in some cases, which caused processors to incorrectly pause, waiting for content archive cleanup to occur when, in fact, there were no files archived
b25201dd22 is described below

commit b25201dd222d8847427bddd6c5a681d41b613bf2
Author: Mark Payne <ma...@hotmail.com>
AuthorDate: Fri May 6 10:11:10 2022 -0400

    NIFI-9993: Fixed bug in initialization in which the Content Repo did not properly increment the counter for how many files exist in the archive directories. This was causing the counter to become negative in some cases, which caused processors to incorrectly pause, waiting for content archive cleanup to occur when, in fact, there were no files archived
    
    Signed-off-by: Joe Gresock <jg...@gmail.com>
    This closes #6021.
---
 .../repository/FileSystemRepository.java           | 35 +++++++++++++++++-----
 .../repository/TestFileSystemRepository.java       | 13 ++++++++
 2 files changed, 41 insertions(+), 7 deletions(-)

diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java
index 52dad6e1fa..4bebb44bbe 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java
@@ -357,8 +357,7 @@ public class FileSystemRepository implements ContentRepository {
                             }
 
                             // Check if this is an 'archive' directory
-                            final Path relativePath = realPath.relativize(file);
-                            if (relativePath.getNameCount() > 3 && ARCHIVE_DIR_NAME.equals(relativePath.subpath(1, 2).toString())) {
+                            if (isArchived(file)) {
                                 final long lastModifiedTime = getLastModTime(file);
 
                                 if (lastModifiedTime < oldestDateHolder.get()) {
@@ -401,6 +400,21 @@ public class FileSystemRepository implements ContentRepository {
         containers.putAll(realPathMap);
     }
 
+    // Visible for testing
+    boolean isArchived(final Path path) {
+        return isArchived(path.toFile());
+    }
+
+    // Visible for testing
+    boolean isArchived(final File file) {
+        final File parentFile = file.getParentFile();
+        if (parentFile == null) {
+            return false;
+        }
+
+        return ARCHIVE_DIR_NAME.equals(parentFile.getName());
+    }
+
     @Override
     public Set<String> getContainerNames() {
         return new HashSet<>(containerNames);
@@ -1194,7 +1208,7 @@ public class FileSystemRepository implements ContentRepository {
     // marked protected for visibility and ability to override for unit tests.
     protected boolean archive(final Path curPath) throws IOException {
         // check if already archived
-        final boolean alreadyArchived = ARCHIVE_DIR_NAME.equals(curPath.getParent().toFile().getName());
+        final boolean alreadyArchived = isArchived(curPath);
         if (alreadyArchived) {
             return false;
         }
@@ -1349,6 +1363,8 @@ public class FileSystemRepository implements ContentRepository {
         // Go through each container and grab the archived data into a List
         archiveExpirationLog.debug("Searching for more archived data to expire");
         final StopWatch stopWatch = new StopWatch(true);
+        final AtomicLong expiredFilesDeleted = new AtomicLong(0L);
+        final AtomicLong expiredBytesDeleted = new AtomicLong(0L);
         for (int i = 0; i < SECTIONS_PER_CONTAINER; i++) {
             final Path sectionContainer = container.resolve(String.valueOf(i));
             final Path archive = sectionContainer.resolve("archive");
@@ -1368,6 +1384,9 @@ public class FileSystemRepository implements ContentRepository {
                         final long lastModTime = getLastModTime(file);
                         if (lastModTime < timestampThreshold) {
                             try {
+                                expiredFilesDeleted.incrementAndGet();
+                                expiredBytesDeleted.addAndGet(file.toFile().length());
+
                                 Files.deleteIfExists(file);
                                 containerState.decrementArchiveCount();
                                 LOG.debug("Deleted archived ContentClaim with ID {} from Container {} because it was older than the configured max archival duration",
@@ -1443,7 +1462,7 @@ public class FileSystemRepository implements ContentRepository {
 
         // Remove the first 'counter' elements from the list because those were removed.
         notYetExceedingThreshold.subList(0, archiveFilesDeleted).clear();
-        LOG.info("Successfully deleted {} files ({}) from archive", archiveFilesDeleted, FormatUtils.formatDataSize(archiveBytesDeleted));
+        LOG.info("Successfully deleted {} files ({}) from archive", (archiveFilesDeleted + expiredFilesDeleted.get()), FormatUtils.formatDataSize(archiveBytesDeleted + expiredBytesDeleted.get()));
 
         final long deleteOldestMillis = stopWatch.getElapsed(TimeUnit.MILLISECONDS) - sortRemainingMillis - deleteExpiredMillis;
 
@@ -1708,7 +1727,7 @@ public class FileSystemRepository implements ContentRepository {
                 while (isWaitRequired()) {
                     try {
                         final String message = String.format("Unable to write flowfile content to content repository container %s due to archive file size constraints;" +
-                                " waiting for archive cleanup", containerName);
+                                " waiting for archive cleanup. Total number of files currently archived = %s", containerName, archivedFileCount.get());
                         LOG.warn(message);
                         eventReporter.reportEvent(Severity.WARNING, "FileSystemRepository", message);
                         condition.await();
@@ -1727,8 +1746,9 @@ public class FileSystemRepository implements ContentRepository {
 
             lock.lock();
             try {
+                long free = 0;
                 try {
-                    final long free = getContainerUsableSpace(containerName);
+                    free = getContainerUsableSpace(containerName);
                     bytesUsed = capacity - free;
                     checkUsedCutoffTimestamp = System.currentTimeMillis() + TimeUnit.MINUTES.toMillis(1L);
                 } catch (final Exception e) {
@@ -1737,7 +1757,8 @@ public class FileSystemRepository implements ContentRepository {
                     checkUsedCutoffTimestamp = 0L; // Signal that the free space should be calculated again next time it's checked.
                 }
 
-                LOG.debug("Container {} signaled to allow Content Claim Creation", containerName);
+                LOG.info("Archive cleanup completed for container {}; will now allow writing to this container. Bytes used = {}, bytes free = {}, capacity = {}", containerName,
+                    FormatUtils.formatDataSize(bytesUsed), FormatUtils.formatDataSize(free), FormatUtils.formatDataSize(capacity));
                 condition.signalAll();
             } finally {
                 lock.unlock();
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java
index 80a8f9d576..e05938ac93 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java
@@ -46,6 +46,7 @@ import java.lang.reflect.Method;
 import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
 import java.nio.file.Path;
+import java.nio.file.Paths;
 import java.nio.file.StandardCopyOption;
 import java.nio.file.StandardOpenOption;
 import java.text.NumberFormat;
@@ -127,6 +128,18 @@ public class TestFileSystemRepository {
                 + NumberFormat.getNumberInstance(Locale.US).format(bytesToWrite) + " bytes) for a write rate of " + mbps + " MB/s");
     }
 
+    @Test
+    public void testIsArchived() {
+        assertFalse(repository.isArchived(Paths.get("1.txt")));
+        assertFalse(repository.isArchived(Paths.get("a/1.txt")));
+        assertFalse(repository.isArchived(Paths.get("a/b/1.txt")));
+        assertFalse(repository.isArchived(Paths.get("a/archive/b/c/1.txt")));
+
+        assertTrue(repository.isArchived(Paths.get("archive/1.txt")));
+        assertTrue(repository.isArchived(Paths.get("a/archive/1.txt")));
+        assertTrue(repository.isArchived(Paths.get("a/b/c/archive/1.txt")));
+    }
+
     @Test
     public void testContentNotFoundExceptionThrownIfResourceClaimTooShort() throws IOException {
         final File contentFile = new File("target/content_repository/0/0.bin");