You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by mt...@apache.org on 2020/05/25 20:22:50 UTC

[nifi] branch master updated: NIFI-6785 Support Deflate Compression NIFI-6785 Remove unused imports

This is an automated email from the ASF dual-hosted git repository.

mthomsen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/master by this push:
     new 101387b  NIFI-6785 Support Deflate Compression NIFI-6785 Remove unused imports
101387b is described below

commit 101387bfaac1397add03af0a6967cda393970eea
Author: adyoun2 <ad...@gchq.gov.uk>
AuthorDate: Thu Oct 17 14:28:59 2019 +0100

    NIFI-6785 Support Deflate Compression
    NIFI-6785 Remove unused imports
    
    This closes #3822
    
    Signed-off-by: Mike Thomsen <mt...@apache.org>
---
 .../nifi/processors/standard/CompressContent.java  |  35 +++++++++++++++-----
 .../processors/standard/TestCompressContent.java   |  36 +++++++++++++++++++++
 .../resources/CompressedData/SampleFile.txt.zlib   | Bin 0 -> 309 bytes
 3 files changed, 63 insertions(+), 8 deletions(-)

diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/CompressContent.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/CompressContent.java
index 7156724..8c7693b 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/CompressContent.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/CompressContent.java
@@ -31,9 +31,10 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
-import lzma.sdk.lzma.Decoder;
-import lzma.streams.LzmaInputStream;
-import lzma.streams.LzmaOutputStream;
+import java.util.zip.Deflater;
+import java.util.zip.DeflaterOutputStream;
+import java.util.zip.InflaterInputStream;
+
 import org.apache.commons.compress.compressors.CompressorStreamFactory;
 import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream;
 import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream;
@@ -70,11 +71,15 @@ import org.xerial.snappy.SnappyFramedOutputStream;
 import org.xerial.snappy.SnappyInputStream;
 import org.xerial.snappy.SnappyOutputStream;
 
+import lzma.sdk.lzma.Decoder;
+import lzma.streams.LzmaInputStream;
+import lzma.streams.LzmaOutputStream;
+
 @EventDriven
 @SideEffectFree
 @SupportsBatching
 @InputRequirement(Requirement.INPUT_REQUIRED)
-@Tags({"content", "compress", "decompress", "gzip", "bzip2", "lzma", "xz-lzma2", "snappy", "snappy framed", "lz4-framed"})
+@Tags({"content", "compress", "decompress", "gzip", "bzip2", "lzma", "xz-lzma2", "snappy", "snappy framed", "lz4-framed", "deflate"})
 @CapabilityDescription("Compresses or decompresses the contents of FlowFiles using a user-specified compression algorithm and updates the mime.type "
     + "attribute as appropriate. This processor operates in a very memory efficient way so very large objects well beyond the heap size "
     + "are generally fine to process")
@@ -88,6 +93,7 @@ public class CompressContent extends AbstractProcessor {
 
     public static final String COMPRESSION_FORMAT_ATTRIBUTE = "use mime.type attribute";
     public static final String COMPRESSION_FORMAT_GZIP = "gzip";
+    public static final String COMPRESSION_FORMAT_DEFLATE = "deflate";
     public static final String COMPRESSION_FORMAT_BZIP2 = "bzip2";
     public static final String COMPRESSION_FORMAT_XZ_LZMA2 = "xz-lzma2";
     public static final String COMPRESSION_FORMAT_LZMA = "lzma";
@@ -100,8 +106,8 @@ public class CompressContent extends AbstractProcessor {
 
     public static final PropertyDescriptor COMPRESSION_FORMAT = new PropertyDescriptor.Builder()
     .name("Compression Format")
-    .description("The compression format to use. Valid values are: GZIP, BZIP2, XZ-LZMA2, LZMA, Snappy, Snappy Framed, and LZ4-Framed")
-    .allowableValues(COMPRESSION_FORMAT_ATTRIBUTE, COMPRESSION_FORMAT_GZIP, COMPRESSION_FORMAT_BZIP2,
+    .description("The compression format to use. Valid values are: GZIP, Deflate, BZIP2, XZ-LZMA2, LZMA, Snappy, Snappy Framed, and LZ4-Framed")
+    .allowableValues(COMPRESSION_FORMAT_ATTRIBUTE, COMPRESSION_FORMAT_GZIP, COMPRESSION_FORMAT_DEFLATE, COMPRESSION_FORMAT_BZIP2,
             COMPRESSION_FORMAT_XZ_LZMA2, COMPRESSION_FORMAT_LZMA, COMPRESSION_FORMAT_SNAPPY, COMPRESSION_FORMAT_SNAPPY_FRAMED,
             COMPRESSION_FORMAT_LZ4_FRAMED)
     .defaultValue(COMPRESSION_FORMAT_ATTRIBUTE)
@@ -109,7 +115,7 @@ public class CompressContent extends AbstractProcessor {
     .build();
     public static final PropertyDescriptor COMPRESSION_LEVEL = new PropertyDescriptor.Builder()
     .name("Compression Level")
-    .description("The compression level to use; this is valid only when using gzip or xz-lzma2 compression. A lower value results in faster processing "
+    .description("The compression level to use; this is valid only when using gzip, deflate or xz-lzma2 compression. A lower value results in faster processing "
         + "but less compression; a value of 0 indicates no (that is, simple archiving) for gzip or minimal for xz-lzma2 compression."
         + " Higher levels can mean much larger memory usage such as the case with levels 7-9 for xz-lzma/2 so be careful relative to heap size.")
         .defaultValue("1")
@@ -162,6 +168,8 @@ public class CompressContent extends AbstractProcessor {
         final Map<String, String> mimeTypeMap = new HashMap<>();
         mimeTypeMap.put("application/gzip", COMPRESSION_FORMAT_GZIP);
         mimeTypeMap.put("application/x-gzip", COMPRESSION_FORMAT_GZIP);
+        mimeTypeMap.put("application/deflate", COMPRESSION_FORMAT_DEFLATE);
+        mimeTypeMap.put("application/x-deflate", COMPRESSION_FORMAT_DEFLATE);
         mimeTypeMap.put("application/bzip2", COMPRESSION_FORMAT_BZIP2);
         mimeTypeMap.put("application/x-bzip2", COMPRESSION_FORMAT_BZIP2);
         mimeTypeMap.put("application/x-lzma", COMPRESSION_FORMAT_LZMA);
@@ -219,6 +227,9 @@ public class CompressContent extends AbstractProcessor {
             case COMPRESSION_FORMAT_GZIP:
                 fileExtension = ".gz";
                 break;
+            case COMPRESSION_FORMAT_DEFLATE:
+                fileExtension = ".zlib";
+                break;
             case COMPRESSION_FORMAT_LZMA:
                 fileExtension = ".lzma";
                 break;
@@ -258,10 +269,15 @@ public class CompressContent extends AbstractProcessor {
 
                             switch (compressionFormat.toLowerCase()) {
                                 case COMPRESSION_FORMAT_GZIP:
-                                    final int compressionLevel = context.getProperty(COMPRESSION_LEVEL).asInteger();
+                                    int compressionLevel = context.getProperty(COMPRESSION_LEVEL).asInteger();
                                     compressionOut = new GZIPOutputStream(bufferedOut, compressionLevel);
                                     mimeTypeRef.set("application/gzip");
                                     break;
+                                case COMPRESSION_FORMAT_DEFLATE:
+                                    compressionLevel = context.getProperty(COMPRESSION_LEVEL).asInteger();
+                                    compressionOut = new DeflaterOutputStream(bufferedOut, new Deflater(compressionLevel));
+                                    mimeTypeRef.set("application/gzip");
+                                    break;
                                 case COMPRESSION_FORMAT_LZMA:
                                     compressionOut = new LzmaOutputStream.Builder(bufferedOut).build();
                                     mimeTypeRef.set("application/x-lzma");
@@ -305,6 +321,9 @@ public class CompressContent extends AbstractProcessor {
                                 case COMPRESSION_FORMAT_GZIP:
                                     compressionIn = new GzipCompressorInputStream(bufferedIn, true);
                                     break;
+                                case COMPRESSION_FORMAT_DEFLATE:
+                                    compressionIn = new InflaterInputStream(bufferedIn);
+                                    break;
                                 case COMPRESSION_FORMAT_SNAPPY:
                                     compressionIn = new SnappyInputStream(bufferedIn);
                                     break;
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestCompressContent.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestCompressContent.java
index 9ed971d..816e307 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestCompressContent.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestCompressContent.java
@@ -221,6 +221,42 @@ public class TestCompressContent {
         flowFile.assertAttributeEquals("filename", "SampleFile.txt");
     }
 
+
+    @Test
+    public void testDeflateDecompress() throws Exception {
+        final TestRunner runner = TestRunners.newTestRunner(CompressContent.class);
+        runner.setProperty(CompressContent.MODE, "decompress");
+        runner.setProperty(CompressContent.COMPRESSION_FORMAT, "deflate");
+        assertTrue(runner.setProperty(CompressContent.UPDATE_FILENAME, "true").isValid());
+
+        runner.enqueue(Paths.get("src/test/resources/CompressedData/SampleFile.txt.zlib"));
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(CompressContent.REL_SUCCESS, 1);
+        MockFlowFile flowFile = runner.getFlowFilesForRelationship(CompressContent.REL_SUCCESS).get(0);
+        System.err.println(new String(flowFile.toByteArray()));
+        flowFile.assertContentEquals(Paths.get("src/test/resources/CompressedData/SampleFile.txt"));
+        flowFile.assertAttributeEquals("filename", "SampleFile.txt");
+    }
+
+
+    @Test
+    public void testDeflateCompress() throws Exception {
+        final TestRunner runner = TestRunners.newTestRunner(CompressContent.class);
+        runner.setProperty(CompressContent.MODE, "compress");
+        runner.setProperty(CompressContent.COMPRESSION_LEVEL, "6");
+        runner.setProperty(CompressContent.COMPRESSION_FORMAT, "deflate");
+        assertTrue(runner.setProperty(CompressContent.UPDATE_FILENAME, "true").isValid());
+
+        runner.enqueue(Paths.get("src/test/resources/CompressedData/SampleFile.txt"));
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(CompressContent.REL_SUCCESS, 1);
+        MockFlowFile flowFile = runner.getFlowFilesForRelationship(CompressContent.REL_SUCCESS).get(0);
+        flowFile.assertContentEquals(Paths.get("src/test/resources/CompressedData/SampleFile.txt.zlib"));
+        flowFile.assertAttributeEquals("filename", "SampleFile.txt.zlib");
+    }
+
     @Test
     public void testFilenameUpdatedOnCompress() throws IOException {
         final TestRunner runner = TestRunners.newTestRunner(CompressContent.class);
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/CompressedData/SampleFile.txt.zlib b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/CompressedData/SampleFile.txt.zlib
new file mode 100644
index 0000000..70a2896
Binary files /dev/null and b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/CompressedData/SampleFile.txt.zlib differ