You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ex...@apache.org on 2022/10/07 15:39:33 UTC

[nifi] branch main updated: NIFI-10273 Supported file entries larger than 8.5GB for TAR in MergeContent

This is an automated email from the ASF dual-hosted git repository.

exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 572799a201 NIFI-10273 Supported file entries larger than 8.5GB for TAR in MergeContent
572799a201 is described below

commit 572799a201df4f440db18aee6585e731dee5bb34
Author: Michael 81877 <mi...@gchq.gov.uk>
AuthorDate: Tue Sep 6 15:10:32 2022 +0100

    NIFI-10273 Supported file entries larger than 8.5GB for TAR in MergeContent
    
    This closes #6369
    
    Signed-off-by: David Handermann <ex...@apache.org>
---
 .../apache/nifi/processors/standard/MergeContent.java    | 16 +++++++++++++++-
 1 file changed, 15 insertions(+), 1 deletion(-)

diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java
index 8b223fadb4..a4602de2cf 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java
@@ -26,6 +26,7 @@ import org.apache.avro.generic.GenericDatumWriter;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
 import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
+import org.apache.commons.compress.archivers.tar.TarConstants;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
@@ -91,6 +92,7 @@ import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.OptionalLong;
 import java.util.Set;
 import java.util.TreeMap;
 import java.util.concurrent.atomic.AtomicReference;
@@ -771,7 +773,6 @@ public class MergeContent extends BinFiles {
         public FlowFile merge(final Bin bin, final ProcessContext context) {
             final List<FlowFile> contents = bin.getContents();
             final ProcessSession session = bin.getSession();
-
             final boolean keepPath = context.getProperty(KEEP_PATH).asBoolean();
             FlowFile bundle = session.create(); // we don't pass the parents to the #create method because the parents belong to different sessions
 
@@ -782,7 +783,12 @@ public class MergeContent extends BinFiles {
                     public void process(final OutputStream rawOut) throws IOException {
                         try (final OutputStream bufferedOut = new BufferedOutputStream(rawOut);
                             final TarArchiveOutputStream out = new TarArchiveOutputStream(bufferedOut)) {
+
                             out.setLongFileMode(TarArchiveOutputStream.LONGFILE_GNU);
+                            // if any one of the FlowFiles is larger than the default maximum tar entry size, then we set bigNumberMode to handle it
+                            if (getMaxEntrySize(contents) >= TarConstants.MAXSIZE) {
+                                out.setBigNumberMode(TarArchiveOutputStream.BIGNUMBER_POSIX);
+                            }
                             for (final FlowFile flowFile : contents) {
                                 final String path = keepPath ? getPath(flowFile) : "";
                                 final String entryName = path + flowFile.getAttribute(CoreAttributes.FILENAME.key());
@@ -827,6 +833,14 @@ public class MergeContent extends BinFiles {
             return bundle;
         }
 
+        private long getMaxEntrySize(final List<FlowFile> contents) {
+            final OptionalLong maxSize = contents.stream()
+                .parallel()
+                .mapToLong(ff -> ff.getSize())
+                .max();
+            return maxSize.orElse(0L);
+        }
+
         @Override
         public String getMergedContentType() {
             return "application/tar";