You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ex...@apache.org on 2022/10/07 15:39:33 UTC
[nifi] branch main updated: NIFI-10273 Supported file entries larger than 8.5GB for TAR in MergeContent
This is an automated email from the ASF dual-hosted git repository.
exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 572799a201 NIFI-10273 Supported file entries larger than 8.5GB for TAR in MergeContent
572799a201 is described below
commit 572799a201df4f440db18aee6585e731dee5bb34
Author: Michael 81877 <mi...@gchq.gov.uk>
AuthorDate: Tue Sep 6 15:10:32 2022 +0100
NIFI-10273 Supported file entries larger than 8.5GB for TAR in MergeContent
This closes #6369
Signed-off-by: David Handermann <ex...@apache.org>
---
.../apache/nifi/processors/standard/MergeContent.java | 16 +++++++++++++++-
1 file changed, 15 insertions(+), 1 deletion(-)
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java
index 8b223fadb4..a4602de2cf 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java
@@ -26,6 +26,7 @@ import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
+import org.apache.commons.compress.archivers.tar.TarConstants;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
@@ -91,6 +92,7 @@ import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.OptionalLong;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicReference;
@@ -771,7 +773,6 @@ public class MergeContent extends BinFiles {
public FlowFile merge(final Bin bin, final ProcessContext context) {
final List<FlowFile> contents = bin.getContents();
final ProcessSession session = bin.getSession();
-
final boolean keepPath = context.getProperty(KEEP_PATH).asBoolean();
FlowFile bundle = session.create(); // we don't pass the parents to the #create method because the parents belong to different sessions
@@ -782,7 +783,12 @@ public class MergeContent extends BinFiles {
public void process(final OutputStream rawOut) throws IOException {
try (final OutputStream bufferedOut = new BufferedOutputStream(rawOut);
final TarArchiveOutputStream out = new TarArchiveOutputStream(bufferedOut)) {
+
out.setLongFileMode(TarArchiveOutputStream.LONGFILE_GNU);
+ // if any one of the FlowFiles is larger than the default maximum tar entry size, then we set bigNumberMode to handle it
+ if (getMaxEntrySize(contents) >= TarConstants.MAXSIZE) {
+ out.setBigNumberMode(TarArchiveOutputStream.BIGNUMBER_POSIX);
+ }
for (final FlowFile flowFile : contents) {
final String path = keepPath ? getPath(flowFile) : "";
final String entryName = path + flowFile.getAttribute(CoreAttributes.FILENAME.key());
@@ -827,6 +833,14 @@ public class MergeContent extends BinFiles {
return bundle;
}
+ private long getMaxEntrySize(final List<FlowFile> contents) {
+ final OptionalLong maxSize = contents.stream()
+ .parallel()
+ .mapToLong(ff -> ff.getSize())
+ .max();
+ return maxSize.orElse(0L);
+ }
+
@Override
public String getMergedContentType() {
return "application/tar";