You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jg...@apache.org on 2022/02/22 17:07:12 UTC

[nifi] branch main updated: NIFI-9704: Updated the ContentRepositoryScanTask to show details of how much content in the content repo is retained by each queue in the dataflow. Changed default for nifi.content.claim.max.appendable.size property from 1 MB to 50 KB. Updated docs to reflect the new default value and explain what the property does and how it's used.

This is an automated email from the ASF dual-hosted git repository.

jgresock pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new d0a23bc  NIFI-9704: Updated the ContentRepositoryScanTask to show details of how much content in the content repo is retained by each queue in the dataflow. Changed default for nifi.content.claim.max.appendable.size property from 1 MB to 50 KB. Updated docs to reflect the new default value and explain what the property does and how it's used.
d0a23bc is described below

commit d0a23bc26baf7775affebaf3de66fb4d674f17c0
Author: Mark Payne <ma...@hotmail.com>
AuthorDate: Thu Feb 17 10:15:05 2022 -0500

    NIFI-9704: Updated the ContentRepositoryScanTask to show details of how much content in the content repo is retained by each queue in the dataflow. Changed default for nifi.content.claim.max.appendable.size property from 1 MB to 50 KB. Updated docs to reflect the new default value and explain what the property does and how it's used.
    
    Signed-off-by: Joe Gresock <jg...@gmail.com>
    
    This closes #5780.
---
 .../src/main/asciidoc/administration-guide.adoc    |  7 +-
 .../controller/repository/ContentRepository.java   |  7 ++
 .../repository/FileSystemRepository.java           | 10 +++
 .../repository/VolatileContentRepository.java      | 32 +++++----
 .../bootstrap/tasks/ContentRepositoryScanTask.java | 84 +++++++++++++++++++++-
 .../repository/StandardProcessSessionIT.java       |  5 ++
 .../nifi-framework/nifi-resources/pom.xml          |  2 +-
 .../repository/ByteArrayContentRepository.java     |  5 ++
 .../StatelessFileSystemContentRepository.java      |  5 ++
 9 files changed, 139 insertions(+), 18 deletions(-)

diff --git a/nifi-docs/src/main/asciidoc/administration-guide.adoc b/nifi-docs/src/main/asciidoc/administration-guide.adoc
index 6501c27..d0db77b 100644
--- a/nifi-docs/src/main/asciidoc/administration-guide.adoc
+++ b/nifi-docs/src/main/asciidoc/administration-guide.adoc
@@ -3312,8 +3312,11 @@ FlowFile Repository, if also on that disk, could become corrupt. To avoid this s
 |====
 |*Property*|*Description*
 |`nifi.content.repository.implementation`|The Content Repository implementation. The default value is `org.apache.nifi.controller.repository.FileSystemRepository` and should only be changed with caution. To store flowfile content in memory instead of on disk (at the risk of data loss in the event of power/machine failure), set this property to `org.apache.nifi.controller.repository.VolatileContentRepository`.
-|`nifi.content.claim.max.appendable.size`|The maximum size for a content claim. The default value is `1 MB`.
-|`nifi.content.claim.max.flow.files`| The max amount of claims to keep open for writing. The default value is `100`
+|`nifi.content.claim.max.appendable.size`|When NiFi processes many small FlowFiles, the contents of those FlowFiles are stored in the content repository, but we do not store the content of each
+individual FlowFile as a separate file in the content repository. Doing so would be very detrimental to performance, if each 120 byte FlowFile, for instance, was written to its own file. Instead,
+we continue writing to the same file until it reaches some threshold. This property configures that threshold. Setting the value too small can result in poor performance due to reading from and
+writing to too many files. However, a file can only be deleted from the content repository once there are no longer any FlowFiles pointing to it. Therefore, setting the value too large can result
+in data remaining in the content repository for much longer, potentially leading to the content repository running out of disk space. The default value is `50 KB`.
 |`nifi.content.repository.directory.default`*|The location of the Content Repository. The default value is `./content_repository`. +
 +
 *NOTE*: Multiple content repositories can be specified by using the `nifi.content.repository.directory.` prefix with unique suffixes and separate paths as values. +
diff --git a/nifi-framework-api/src/main/java/org/apache/nifi/controller/repository/ContentRepository.java b/nifi-framework-api/src/main/java/org/apache/nifi/controller/repository/ContentRepository.java
index 6693143..f838db7 100644
--- a/nifi-framework-api/src/main/java/org/apache/nifi/controller/repository/ContentRepository.java
+++ b/nifi-framework-api/src/main/java/org/apache/nifi/controller/repository/ContentRepository.java
@@ -243,6 +243,13 @@ public interface ContentRepository {
     long size(ContentClaim claim) throws IOException;
 
     /**
+     * @param claim to get size of
+     * @return size in bytes of the file/object backing the given resource claim, or 0 if this operation is not supported by the implementation
+     * @throws IOException if size check failed
+     */
+    long size(ResourceClaim claim) throws IOException;
+
+    /**
      * Provides access to the input stream for the given claim
      *
      * @param claim to read from
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java
index 500bab7..689487f 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java
@@ -908,6 +908,16 @@ public class FileSystemRepository implements ContentRepository {
     }
 
     @Override
+    public long size(final ResourceClaim claim) throws IOException {
+        final Path path = getPath(claim);
+        if (path == null) {
+            return 0L;
+        }
+
+        return Files.size(path);
+    }
+
+    @Override
     public InputStream read(final ResourceClaim claim) throws IOException {
         if (claim == null) {
             return new ByteArrayInputStream(new byte[0]);
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileContentRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileContentRepository.java
index eb2a2db..03f8ce0 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileContentRepository.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileContentRepository.java
@@ -16,6 +16,20 @@
  */
 package org.apache.nifi.controller.repository;
 
+import org.apache.commons.io.IOUtils;
+import org.apache.nifi.controller.repository.claim.ContentClaim;
+import org.apache.nifi.controller.repository.claim.ResourceClaim;
+import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
+import org.apache.nifi.controller.repository.claim.StandardContentClaim;
+import org.apache.nifi.controller.repository.io.ArrayManagedOutputStream;
+import org.apache.nifi.controller.repository.io.MemoryManager;
+import org.apache.nifi.engine.FlowEngine;
+import org.apache.nifi.processor.DataUnit;
+import org.apache.nifi.stream.io.StreamUtils;
+import org.apache.nifi.util.NiFiProperties;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.io.ByteArrayInputStream;
 import java.io.FileInputStream;
 import java.io.FilterOutputStream;
@@ -39,19 +53,6 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
-import org.apache.commons.io.IOUtils;
-import org.apache.nifi.controller.repository.claim.ContentClaim;
-import org.apache.nifi.controller.repository.claim.ResourceClaim;
-import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
-import org.apache.nifi.controller.repository.claim.StandardContentClaim;
-import org.apache.nifi.controller.repository.io.ArrayManagedOutputStream;
-import org.apache.nifi.controller.repository.io.MemoryManager;
-import org.apache.nifi.engine.FlowEngine;
-import org.apache.nifi.processor.DataUnit;
-import org.apache.nifi.stream.io.StreamUtils;
-import org.apache.nifi.util.NiFiProperties;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * <p>
@@ -459,6 +460,11 @@ public class VolatileContentRepository implements ContentRepository {
     }
 
     @Override
+    public long size(final ResourceClaim claim) throws IOException {
+        return 0;
+    }
+
+    @Override
     public InputStream read(final ContentClaim claim) throws IOException {
         if (claim == null) {
             return new ByteArrayInputStream(new byte[0]);
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/diagnostics/bootstrap/tasks/ContentRepositoryScanTask.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/diagnostics/bootstrap/tasks/ContentRepositoryScanTask.java
index ca175df..de52b0d 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/diagnostics/bootstrap/tasks/ContentRepositoryScanTask.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/diagnostics/bootstrap/tasks/ContentRepositoryScanTask.java
@@ -16,7 +16,10 @@
  */
 package org.apache.nifi.diagnostics.bootstrap.tasks;
 
+import org.apache.nifi.connectable.Connection;
 import org.apache.nifi.controller.FlowController;
+import org.apache.nifi.controller.flow.FlowManager;
+import org.apache.nifi.controller.queue.QueueSize;
 import org.apache.nifi.controller.repository.ContentRepository;
 import org.apache.nifi.controller.repository.FlowFileRepository;
 import org.apache.nifi.controller.repository.FlowFileSwapManager;
@@ -26,11 +29,16 @@ import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
 import org.apache.nifi.diagnostics.DiagnosticTask;
 import org.apache.nifi.diagnostics.DiagnosticsDumpElement;
 import org.apache.nifi.diagnostics.StandardDiagnosticsDumpElement;
+import org.apache.nifi.util.FormatUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.text.NumberFormat;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -63,6 +71,10 @@ public class ContentRepositoryScanTask implements DiagnosticTask {
 
         final List<String> details = new ArrayList<>();
 
+        final Map<String, RetainedFileSet> retainedFileSetsByQueue = new HashMap<>();
+        final FlowManager flowManager = flowController.getFlowManager();
+
+        final NumberFormat numberFormat = NumberFormat.getNumberInstance();
         for (final String containerName : contentRepository.getContainerNames()) {
             try {
                 final Set<ResourceClaim> resourceClaims = contentRepository.getActiveResourceClaims(containerName);
@@ -77,8 +89,22 @@ public class ContentRepositoryScanTask implements DiagnosticTask {
                     final Set<ResourceClaimReference> references = referenceMap == null ? Collections.emptySet() : referenceMap.getOrDefault(resourceClaim, Collections.emptySet());
 
                     final String path = resourceClaim.getContainer() + "/" + resourceClaim.getSection() + "/" + resourceClaim.getId();
-                    details.add(String.format("%1$s, Claimant Count = %2$d, In Use = %3$b, Awaiting Destruction = %4$b, References (%5$d) = %6$s",
-                        path, claimCount, inUse, destructable,  references.size(), references.toString()));
+                    final long fileSize = contentRepository.size(resourceClaim);
+                    details.add(String.format("%1$s; Size = %2$s bytes; Claimant Count = %3$d; In Use = %4$b; Awaiting Destruction = %5$b; References (%6$d) = %7$s",
+                        path, numberFormat.format(fileSize), claimCount, inUse, destructable,  references.size(), references));
+
+                    for (final ResourceClaimReference claimReference : references) {
+                        final String queueId = claimReference.getQueueIdentifier();
+                        final Connection connection = flowManager.getConnection(queueId);
+                        QueueSize queueSize = new QueueSize(0, 0L);
+                        if (connection != null) {
+                            queueSize = connection.getFlowFileQueue().size();
+                        }
+
+                        final RetainedFileSet retainedFileSet = retainedFileSetsByQueue.computeIfAbsent(queueId, RetainedFileSet::new);
+                        retainedFileSet.addFile(path, fileSize);
+                        retainedFileSet.setQueueSize(queueSize);
+                    }
                 }
             } catch (final Exception e) {
                 logger.error("Failed to obtain listing of Active Resource Claims for container {}", containerName, e);
@@ -100,6 +126,60 @@ public class ContentRepositoryScanTask implements DiagnosticTask {
             }
         }
 
+        details.add("");
+
+        final List<RetainedFileSet> retainedFileSets = new ArrayList<>(retainedFileSetsByQueue.values());
+        retainedFileSets.sort(Comparator.comparing(RetainedFileSet::getByteCount).reversed());
+        details.add("The following queues retain data in the Content Repository:");
+        if (retainedFileSets.isEmpty()) {
+            details.add("No queues retain any files in the Content Repository");
+        } else {
+            for (final RetainedFileSet retainedFileSet : retainedFileSets) {
+                final String formatted = String.format("Queue ID = %s; Queue Size = %s FlowFiles / %s; Retained Files = %d; Retained Size = %s bytes (%s)",
+                    retainedFileSet.getQueueId(), numberFormat.format(retainedFileSet.getQueueSize().getObjectCount()), FormatUtils.formatDataSize(retainedFileSet.getQueueSize().getByteCount()),
+                    retainedFileSet.getFilenames().size(), numberFormat.format(retainedFileSet.getByteCount()), FormatUtils.formatDataSize(retainedFileSet.getByteCount()));
+
+                details.add(formatted);
+            }
+        }
+
         return new StandardDiagnosticsDumpElement("Content Repository Scan", details);
     }
+
+    private static class RetainedFileSet {
+        private final String queueId;
+        private final Set<String> filenames = new HashSet<>();
+        private long byteCount;
+        private QueueSize queueSize;
+
+        public RetainedFileSet(final String queueId) {
+            this.queueId = queueId;
+        }
+
+        public String getQueueId() {
+            return queueId;
+        }
+
+        public void addFile(final String filename, final long bytes) {
+            if (filenames.add(filename)) {
+                byteCount += bytes;
+            }
+        }
+
+        public Set<String> getFilenames() {
+            return filenames;
+        }
+
+        public long getByteCount() {
+            return byteCount;
+        }
+
+        public QueueSize getQueueSize() {
+            return queueSize;
+        }
+
+        public void setQueueSize(final QueueSize queueSize) {
+            this.queueSize = queueSize;
+        }
+    }
 }
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/StandardProcessSessionIT.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/StandardProcessSessionIT.java
index d73b081..65b6ac9 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/StandardProcessSessionIT.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/StandardProcessSessionIT.java
@@ -3431,6 +3431,11 @@ public class StandardProcessSessionIT {
         }
 
         @Override
+        public long size(final ResourceClaim claim) throws IOException {
+            return Files.size(getPath(claim));
+        }
+
+        @Override
         public InputStream read(ContentClaim claim) throws IOException {
             if (disableRead) {
                 throw new IOException("Reading from repo is disabled by unit test");
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/pom.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/pom.xml
index df08858..f2054fa 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/pom.xml
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/pom.xml
@@ -64,7 +64,7 @@
         <nifi.queue.swap.threshold>20000</nifi.queue.swap.threshold>
 
         <nifi.content.repository.implementation>org.apache.nifi.controller.repository.FileSystemRepository</nifi.content.repository.implementation>
-        <nifi.content.claim.max.appendable.size>1 MB</nifi.content.claim.max.appendable.size>
+        <nifi.content.claim.max.appendable.size>50 KB</nifi.content.claim.max.appendable.size>
         <nifi.content.repository.directory.default>./content_repository</nifi.content.repository.directory.default>
         <nifi.content.repository.archive.max.retention.period>7 days</nifi.content.repository.archive.max.retention.period>
         <nifi.content.repository.archive.max.usage.percentage>50%</nifi.content.repository.archive.max.usage.percentage>
diff --git a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/repository/ByteArrayContentRepository.java b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/repository/ByteArrayContentRepository.java
index 6415893..39baaea 100644
--- a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/repository/ByteArrayContentRepository.java
+++ b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/repository/ByteArrayContentRepository.java
@@ -209,6 +209,11 @@ public class ByteArrayContentRepository implements ContentRepository {
     }
 
     @Override
+    public long size(final ResourceClaim claim) throws IOException {
+        return 0;
+    }
+
+    @Override
     public InputStream read(final ContentClaim claim) {
         final ByteArrayContentClaim byteArrayContentClaim = verifyClaim(claim);
         return byteArrayContentClaim.read();
diff --git a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/repository/StatelessFileSystemContentRepository.java b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/repository/StatelessFileSystemContentRepository.java
index e436748..a0dfdd2 100644
--- a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/repository/StatelessFileSystemContentRepository.java
+++ b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/repository/StatelessFileSystemContentRepository.java
@@ -245,6 +245,11 @@ public class StatelessFileSystemContentRepository implements ContentRepository {
     }
 
     @Override
+    public long size(final ResourceClaim claim) throws IOException {
+        return 0;
+    }
+
+    @Override
     public InputStream read(final ContentClaim claim) throws IOException {
         if (claim == null) {
             return new ByteArrayInputStream(new byte[0]);