You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2020/09/07 23:55:07 UTC

[nifi] 01/02: NIFI-7506 add snappy-hadoop to CompressContent

This is an automated email from the ASF dual-hosted git repository.

joewitt pushed a commit to branch support/nifi-1.12.x
in repository https://gitbox.apache.org/repos/asf/nifi.git

commit d18c52041504bddddd6d4efeb05d2fd1b7918709
Author: noedetore <no...@detorit.biz>
AuthorDate: Mon Aug 31 12:24:34 2020 -0400

    NIFI-7506 add snappy-hadoop to CompressContent
    
    Signed-off-by: Pierre Villard <pi...@gmail.com>
    
    This closes #4321.
---
 .../nifi/processors/standard/CompressContent.java  | 39 ++++++++++++++++++---
 .../processors/standard/TestCompressContent.java   | 40 ++++++++++++++++++----
 2 files changed, 68 insertions(+), 11 deletions(-)

diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/CompressContent.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/CompressContent.java
index 8c7693b..2fdb4ba 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/CompressContent.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/CompressContent.java
@@ -23,6 +23,7 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -51,6 +52,9 @@ import org.apache.nifi.annotation.behavior.WritesAttribute;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.Validator;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
 import org.apache.nifi.logging.ComponentLog;
@@ -68,6 +72,7 @@ import org.tukaani.xz.XZInputStream;
 import org.tukaani.xz.XZOutputStream;
 import org.xerial.snappy.SnappyFramedInputStream;
 import org.xerial.snappy.SnappyFramedOutputStream;
+import org.xerial.snappy.SnappyHadoopCompatibleOutputStream;
 import org.xerial.snappy.SnappyInputStream;
 import org.xerial.snappy.SnappyOutputStream;
 
@@ -79,7 +84,7 @@ import lzma.streams.LzmaOutputStream;
 @SideEffectFree
 @SupportsBatching
 @InputRequirement(Requirement.INPUT_REQUIRED)
-@Tags({"content", "compress", "decompress", "gzip", "bzip2", "lzma", "xz-lzma2", "snappy", "snappy framed", "lz4-framed", "deflate"})
+@Tags({"content", "compress", "decompress", "gzip", "bzip2", "lzma", "xz-lzma2", "snappy", "snappy-hadoop", "snappy framed", "lz4-framed", "deflate"})
 @CapabilityDescription("Compresses or decompresses the contents of FlowFiles using a user-specified compression algorithm and updates the mime.type "
     + "attribute as appropriate. This processor operates in a very memory efficient way so very large objects well beyond the heap size "
     + "are generally fine to process")
@@ -98,6 +103,7 @@ public class CompressContent extends AbstractProcessor {
     public static final String COMPRESSION_FORMAT_XZ_LZMA2 = "xz-lzma2";
     public static final String COMPRESSION_FORMAT_LZMA = "lzma";
     public static final String COMPRESSION_FORMAT_SNAPPY = "snappy";
+    public static final String COMPRESSION_FORMAT_SNAPPY_HADOOP = "snappy-hadoop";
     public static final String COMPRESSION_FORMAT_SNAPPY_FRAMED = "snappy framed";
     public static final String COMPRESSION_FORMAT_LZ4_FRAMED ="lz4-framed";
 
@@ -106,9 +112,9 @@ public class CompressContent extends AbstractProcessor {
 
     public static final PropertyDescriptor COMPRESSION_FORMAT = new PropertyDescriptor.Builder()
     .name("Compression Format")
-    .description("The compression format to use. Valid values are: GZIP, Deflate, BZIP2, XZ-LZMA2, LZMA, Snappy, Snappy Framed, and LZ4-Framed")
+    .description("The compression format to use. Valid values are: GZIP, Deflate, BZIP2, XZ-LZMA2, LZMA, Snappy, Snappy Hadoop, Snappy Framed, and LZ4-Framed")
     .allowableValues(COMPRESSION_FORMAT_ATTRIBUTE, COMPRESSION_FORMAT_GZIP, COMPRESSION_FORMAT_DEFLATE, COMPRESSION_FORMAT_BZIP2,
-            COMPRESSION_FORMAT_XZ_LZMA2, COMPRESSION_FORMAT_LZMA, COMPRESSION_FORMAT_SNAPPY, COMPRESSION_FORMAT_SNAPPY_FRAMED,
+            COMPRESSION_FORMAT_XZ_LZMA2, COMPRESSION_FORMAT_LZMA, COMPRESSION_FORMAT_SNAPPY, COMPRESSION_FORMAT_SNAPPY_HADOOP, COMPRESSION_FORMAT_SNAPPY_FRAMED,
             COMPRESSION_FORMAT_LZ4_FRAMED)
     .defaultValue(COMPRESSION_FORMAT_ATTRIBUTE)
     .required(true)
@@ -124,7 +130,7 @@ public class CompressContent extends AbstractProcessor {
         .build();
     public static final PropertyDescriptor MODE = new PropertyDescriptor.Builder()
     .name("Mode")
-    .description("Indicates whether the processor should compress content or decompress content. Must be either 'compress' or 'decompress'")
+    .description("Indicates whether the processor should compress content or decompress content. Must be either 'compress' or 'decompress'.")
     .allowableValues(MODE_COMPRESS, MODE_DECOMPRESS)
     .defaultValue(MODE_COMPRESS)
     .required(true)
@@ -174,6 +180,7 @@ public class CompressContent extends AbstractProcessor {
         mimeTypeMap.put("application/x-bzip2", COMPRESSION_FORMAT_BZIP2);
         mimeTypeMap.put("application/x-lzma", COMPRESSION_FORMAT_LZMA);
         mimeTypeMap.put("application/x-snappy", COMPRESSION_FORMAT_SNAPPY);
+        mimeTypeMap.put("application/x-snappy-hadoop", COMPRESSION_FORMAT_SNAPPY_HADOOP);
         mimeTypeMap.put("application/x-snappy-framed", COMPRESSION_FORMAT_SNAPPY_FRAMED);
         mimeTypeMap.put("application/x-lz4-framed", COMPRESSION_FORMAT_LZ4_FRAMED);
         this.compressionFormatMimeTypeMap = Collections.unmodifiableMap(mimeTypeMap);
@@ -190,6 +197,21 @@ public class CompressContent extends AbstractProcessor {
     }
 
     @Override
+    protected Collection<ValidationResult> customValidate(final ValidationContext context) {
+        final List<ValidationResult> validationResults = new ArrayList<>(super.customValidate(context));
+
+        final Validator rateValidator;
+        if (context.getProperty(COMPRESSION_FORMAT).getValue().toLowerCase().equals(COMPRESSION_FORMAT_SNAPPY_HADOOP)
+                && context.getProperty(MODE).getValue().toLowerCase().equals(MODE_DECOMPRESS)) {
+            validationResults.add(new ValidationResult.Builder().subject(COMPRESSION_FORMAT.getName())
+                    .explanation("<Compression Format> set to <snappy-hadoop> and <MODE> set to <decompress> is not permitted. " +
+                            "Data that is compressed with Snappy Hadoop can not be decompressed using this processor.")
+                    .build());
+        }
+        return validationResults;
+    }
+
+    @Override
     public void onTrigger(final ProcessContext context, final ProcessSession session) {
         FlowFile flowFile = session.get();
         if (flowFile == null) {
@@ -242,6 +264,9 @@ public class CompressContent extends AbstractProcessor {
             case COMPRESSION_FORMAT_SNAPPY:
                 fileExtension = ".snappy";
                 break;
+            case COMPRESSION_FORMAT_SNAPPY_HADOOP:
+                fileExtension = ".snappy";
+                break;
             case COMPRESSION_FORMAT_SNAPPY_FRAMED:
                 fileExtension = ".sz";
                 break;
@@ -291,6 +316,10 @@ public class CompressContent extends AbstractProcessor {
                                     compressionOut = new SnappyOutputStream(bufferedOut);
                                     mimeTypeRef.set("application/x-snappy");
                                     break;
+                                case COMPRESSION_FORMAT_SNAPPY_HADOOP:
+                                    compressionOut = new SnappyHadoopCompatibleOutputStream(bufferedOut);
+                                    mimeTypeRef.set("application/x-snappy-hadoop");
+                                    break;
                                 case COMPRESSION_FORMAT_SNAPPY_FRAMED:
                                     compressionOut = new SnappyFramedOutputStream(bufferedOut);
                                     mimeTypeRef.set("application/x-snappy-framed");
@@ -327,6 +356,8 @@ public class CompressContent extends AbstractProcessor {
                                 case COMPRESSION_FORMAT_SNAPPY:
                                     compressionIn = new SnappyInputStream(bufferedIn);
                                     break;
+                                case COMPRESSION_FORMAT_SNAPPY_HADOOP:
+                                    throw new Exception("Cannot decompress snappy-hadoop.");
                                 case COMPRESSION_FORMAT_SNAPPY_FRAMED:
                                     compressionIn = new SnappyFramedInputStream(bufferedIn);
                                     break;
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestCompressContent.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestCompressContent.java
index 816e307..a87581d 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestCompressContent.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestCompressContent.java
@@ -16,18 +16,18 @@
  */
 package org.apache.nifi.processors.standard;
 
-import static org.junit.Assert.assertTrue;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Test;
 
 import java.io.IOException;
 import java.nio.file.Paths;
 import java.util.HashMap;
 import java.util.Map;
 
-import org.apache.nifi.flowfile.attributes.CoreAttributes;
-import org.apache.nifi.util.MockFlowFile;
-import org.apache.nifi.util.TestRunner;
-import org.apache.nifi.util.TestRunners;
-import org.junit.Test;
+import static org.junit.Assert.assertTrue;
 
 public class TestCompressContent {
 
@@ -64,6 +64,32 @@ public class TestCompressContent {
     }
 
     @Test
+    public void testSnappyHadoopCompress() throws Exception {
+        final TestRunner runner = TestRunners.newTestRunner(CompressContent.class);
+        runner.setProperty(CompressContent.MODE, CompressContent.MODE_COMPRESS);
+        runner.setProperty(CompressContent.COMPRESSION_FORMAT, CompressContent.COMPRESSION_FORMAT_SNAPPY_HADOOP);
+        runner.setProperty(CompressContent.UPDATE_FILENAME, "true");
+
+        runner.enqueue(Paths.get("src/test/resources/CompressedData/SampleFile.txt"));
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(CompressContent.REL_SUCCESS, 1);
+        MockFlowFile flowFile = runner.getFlowFilesForRelationship(CompressContent.REL_SUCCESS).get(0);
+        flowFile.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "application/x-snappy-hadoop");
+        flowFile.assertAttributeEquals("filename", "SampleFile.txt.snappy");
+    }
+
+    @Test
+    public void testSnappyHadoopDecompress() {
+        final TestRunner runner = TestRunners.newTestRunner(CompressContent.class);
+        runner.setProperty(CompressContent.MODE, CompressContent.MODE_DECOMPRESS);
+        runner.setProperty(CompressContent.COMPRESSION_FORMAT, CompressContent.COMPRESSION_FORMAT_SNAPPY_HADOOP);
+        runner.setProperty(CompressContent.UPDATE_FILENAME, "true");
+
+        runner.assertNotValid();
+    }
+
+    @Test
     public void testSnappyFramedCompress() throws Exception {
         final TestRunner runner = TestRunners.newTestRunner(CompressContent.class);
         runner.setProperty(CompressContent.MODE, CompressContent.MODE_COMPRESS);
@@ -210,7 +236,7 @@ public class TestCompressContent {
 
         runner.clearTransferState();
         runner.setProperty(CompressContent.COMPRESSION_FORMAT, CompressContent.COMPRESSION_FORMAT_ATTRIBUTE);
-        Map<String,String> attributes = new HashMap<String,String>();
+        Map<String, String> attributes = new HashMap<>();
         attributes.put(CoreAttributes.MIME_TYPE.key(), "application/x-gzip");
         runner.enqueue(Paths.get("src/test/resources/CompressedData/SampleFile.txt.gz"), attributes);
         runner.run();