You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by mt...@apache.org on 2020/05/25 20:22:50 UTC
[nifi] branch master updated: NIFI-6785 Support Deflate Compression
NIFI-6785 Remove unused imports
This is an automated email from the ASF dual-hosted git repository.
mthomsen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/master by this push:
new 101387b NIFI-6785 Support Deflate Compression NIFI-6785 Remove unused imports
101387b is described below
commit 101387bfaac1397add03af0a6967cda393970eea
Author: adyoun2 <ad...@gchq.gov.uk>
AuthorDate: Thu Oct 17 14:28:59 2019 +0100
NIFI-6785 Support Deflate Compression
NIFI-6785 Remove unused imports
This closes #3822
Signed-off-by: Mike Thomsen <mt...@apache.org>
---
.../nifi/processors/standard/CompressContent.java | 35 +++++++++++++++-----
.../processors/standard/TestCompressContent.java | 36 +++++++++++++++++++++
.../resources/CompressedData/SampleFile.txt.zlib | Bin 0 -> 309 bytes
3 files changed, 63 insertions(+), 8 deletions(-)
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/CompressContent.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/CompressContent.java
index 7156724..8c7693b 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/CompressContent.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/CompressContent.java
@@ -31,9 +31,10 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
-import lzma.sdk.lzma.Decoder;
-import lzma.streams.LzmaInputStream;
-import lzma.streams.LzmaOutputStream;
+import java.util.zip.Deflater;
+import java.util.zip.DeflaterOutputStream;
+import java.util.zip.InflaterInputStream;
+
import org.apache.commons.compress.compressors.CompressorStreamFactory;
import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream;
import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream;
@@ -70,11 +71,15 @@ import org.xerial.snappy.SnappyFramedOutputStream;
import org.xerial.snappy.SnappyInputStream;
import org.xerial.snappy.SnappyOutputStream;
+import lzma.sdk.lzma.Decoder;
+import lzma.streams.LzmaInputStream;
+import lzma.streams.LzmaOutputStream;
+
@EventDriven
@SideEffectFree
@SupportsBatching
@InputRequirement(Requirement.INPUT_REQUIRED)
-@Tags({"content", "compress", "decompress", "gzip", "bzip2", "lzma", "xz-lzma2", "snappy", "snappy framed", "lz4-framed"})
+@Tags({"content", "compress", "decompress", "gzip", "bzip2", "lzma", "xz-lzma2", "snappy", "snappy framed", "lz4-framed", "deflate"})
@CapabilityDescription("Compresses or decompresses the contents of FlowFiles using a user-specified compression algorithm and updates the mime.type "
+ "attribute as appropriate. This processor operates in a very memory efficient way so very large objects well beyond the heap size "
+ "are generally fine to process")
@@ -88,6 +93,7 @@ public class CompressContent extends AbstractProcessor {
public static final String COMPRESSION_FORMAT_ATTRIBUTE = "use mime.type attribute";
public static final String COMPRESSION_FORMAT_GZIP = "gzip";
+ public static final String COMPRESSION_FORMAT_DEFLATE = "deflate";
public static final String COMPRESSION_FORMAT_BZIP2 = "bzip2";
public static final String COMPRESSION_FORMAT_XZ_LZMA2 = "xz-lzma2";
public static final String COMPRESSION_FORMAT_LZMA = "lzma";
@@ -100,8 +106,8 @@ public class CompressContent extends AbstractProcessor {
public static final PropertyDescriptor COMPRESSION_FORMAT = new PropertyDescriptor.Builder()
.name("Compression Format")
- .description("The compression format to use. Valid values are: GZIP, BZIP2, XZ-LZMA2, LZMA, Snappy, Snappy Framed, and LZ4-Framed")
- .allowableValues(COMPRESSION_FORMAT_ATTRIBUTE, COMPRESSION_FORMAT_GZIP, COMPRESSION_FORMAT_BZIP2,
+ .description("The compression format to use. Valid values are: GZIP, Deflate, BZIP2, XZ-LZMA2, LZMA, Snappy, Snappy Framed, and LZ4-Framed")
+ .allowableValues(COMPRESSION_FORMAT_ATTRIBUTE, COMPRESSION_FORMAT_GZIP, COMPRESSION_FORMAT_DEFLATE, COMPRESSION_FORMAT_BZIP2,
COMPRESSION_FORMAT_XZ_LZMA2, COMPRESSION_FORMAT_LZMA, COMPRESSION_FORMAT_SNAPPY, COMPRESSION_FORMAT_SNAPPY_FRAMED,
COMPRESSION_FORMAT_LZ4_FRAMED)
.defaultValue(COMPRESSION_FORMAT_ATTRIBUTE)
@@ -109,7 +115,7 @@ public class CompressContent extends AbstractProcessor {
.build();
public static final PropertyDescriptor COMPRESSION_LEVEL = new PropertyDescriptor.Builder()
.name("Compression Level")
- .description("The compression level to use; this is valid only when using gzip or xz-lzma2 compression. A lower value results in faster processing "
+ .description("The compression level to use; this is valid only when using gzip, deflate or xz-lzma2 compression. A lower value results in faster processing "
+ "but less compression; a value of 0 indicates no (that is, simple archiving) for gzip or minimal for xz-lzma2 compression."
+ " Higher levels can mean much larger memory usage such as the case with levels 7-9 for xz-lzma/2 so be careful relative to heap size.")
.defaultValue("1")
@@ -162,6 +168,8 @@ public class CompressContent extends AbstractProcessor {
final Map<String, String> mimeTypeMap = new HashMap<>();
mimeTypeMap.put("application/gzip", COMPRESSION_FORMAT_GZIP);
mimeTypeMap.put("application/x-gzip", COMPRESSION_FORMAT_GZIP);
+ mimeTypeMap.put("application/deflate", COMPRESSION_FORMAT_DEFLATE);
+ mimeTypeMap.put("application/x-deflate", COMPRESSION_FORMAT_DEFLATE);
mimeTypeMap.put("application/bzip2", COMPRESSION_FORMAT_BZIP2);
mimeTypeMap.put("application/x-bzip2", COMPRESSION_FORMAT_BZIP2);
mimeTypeMap.put("application/x-lzma", COMPRESSION_FORMAT_LZMA);
@@ -219,6 +227,9 @@ public class CompressContent extends AbstractProcessor {
case COMPRESSION_FORMAT_GZIP:
fileExtension = ".gz";
break;
+ case COMPRESSION_FORMAT_DEFLATE:
+ fileExtension = ".zlib";
+ break;
case COMPRESSION_FORMAT_LZMA:
fileExtension = ".lzma";
break;
@@ -258,10 +269,15 @@ public class CompressContent extends AbstractProcessor {
switch (compressionFormat.toLowerCase()) {
case COMPRESSION_FORMAT_GZIP:
- final int compressionLevel = context.getProperty(COMPRESSION_LEVEL).asInteger();
+ int compressionLevel = context.getProperty(COMPRESSION_LEVEL).asInteger();
compressionOut = new GZIPOutputStream(bufferedOut, compressionLevel);
mimeTypeRef.set("application/gzip");
break;
+ case COMPRESSION_FORMAT_DEFLATE:
+ compressionLevel = context.getProperty(COMPRESSION_LEVEL).asInteger();
+ compressionOut = new DeflaterOutputStream(bufferedOut, new Deflater(compressionLevel));
+ mimeTypeRef.set("application/gzip");
+ break;
case COMPRESSION_FORMAT_LZMA:
compressionOut = new LzmaOutputStream.Builder(bufferedOut).build();
mimeTypeRef.set("application/x-lzma");
@@ -305,6 +321,9 @@ public class CompressContent extends AbstractProcessor {
case COMPRESSION_FORMAT_GZIP:
compressionIn = new GzipCompressorInputStream(bufferedIn, true);
break;
+ case COMPRESSION_FORMAT_DEFLATE:
+ compressionIn = new InflaterInputStream(bufferedIn);
+ break;
case COMPRESSION_FORMAT_SNAPPY:
compressionIn = new SnappyInputStream(bufferedIn);
break;
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestCompressContent.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestCompressContent.java
index 9ed971d..816e307 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestCompressContent.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestCompressContent.java
@@ -221,6 +221,42 @@ public class TestCompressContent {
flowFile.assertAttributeEquals("filename", "SampleFile.txt");
}
+
+ @Test
+ public void testDeflateDecompress() throws Exception {
+ final TestRunner runner = TestRunners.newTestRunner(CompressContent.class);
+ runner.setProperty(CompressContent.MODE, "decompress");
+ runner.setProperty(CompressContent.COMPRESSION_FORMAT, "deflate");
+ assertTrue(runner.setProperty(CompressContent.UPDATE_FILENAME, "true").isValid());
+
+ runner.enqueue(Paths.get("src/test/resources/CompressedData/SampleFile.txt.zlib"));
+ runner.run();
+
+ runner.assertAllFlowFilesTransferred(CompressContent.REL_SUCCESS, 1);
+ MockFlowFile flowFile = runner.getFlowFilesForRelationship(CompressContent.REL_SUCCESS).get(0);
+ System.err.println(new String(flowFile.toByteArray()));
+ flowFile.assertContentEquals(Paths.get("src/test/resources/CompressedData/SampleFile.txt"));
+ flowFile.assertAttributeEquals("filename", "SampleFile.txt");
+ }
+
+
+ @Test
+ public void testDeflateCompress() throws Exception {
+ final TestRunner runner = TestRunners.newTestRunner(CompressContent.class);
+ runner.setProperty(CompressContent.MODE, "compress");
+ runner.setProperty(CompressContent.COMPRESSION_LEVEL, "6");
+ runner.setProperty(CompressContent.COMPRESSION_FORMAT, "deflate");
+ assertTrue(runner.setProperty(CompressContent.UPDATE_FILENAME, "true").isValid());
+
+ runner.enqueue(Paths.get("src/test/resources/CompressedData/SampleFile.txt"));
+ runner.run();
+
+ runner.assertAllFlowFilesTransferred(CompressContent.REL_SUCCESS, 1);
+ MockFlowFile flowFile = runner.getFlowFilesForRelationship(CompressContent.REL_SUCCESS).get(0);
+ flowFile.assertContentEquals(Paths.get("src/test/resources/CompressedData/SampleFile.txt.zlib"));
+ flowFile.assertAttributeEquals("filename", "SampleFile.txt.zlib");
+ }
+
@Test
public void testFilenameUpdatedOnCompress() throws IOException {
final TestRunner runner = TestRunners.newTestRunner(CompressContent.class);
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/CompressedData/SampleFile.txt.zlib b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/CompressedData/SampleFile.txt.zlib
new file mode 100644
index 0000000..70a2896
Binary files /dev/null and b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/CompressedData/SampleFile.txt.zlib differ