You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by al...@apache.org on 2016/02/25 18:47:25 UTC

nifi git commit: NIFI-1357 Add Snappy compression to "CompressContent" Processor

Repository: nifi
Updated Branches:
  refs/heads/master 0d13de0cf -> 32f476aaa


NIFI-1357 Add Snappy compression to "CompressContent" Processor

This closes #164.

Signed-off-by: Aldrin Piri <al...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/32f476aa
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/32f476aa
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/32f476aa

Branch: refs/heads/master
Commit: 32f476aaa7040ed4718713f2ed11e11bc7ef4f1c
Parents: 0d13de0
Author: Jeremy Dyer <jd...@gmail.com>
Authored: Fri Jan 8 09:54:24 2016 -0500
Committer: Aldrin Piri <al...@apache.org>
Committed: Thu Feb 25 12:47:01 2016 -0500

----------------------------------------------------------------------
 .../nifi-standard-processors/pom.xml            |   5 ++
 .../processors/standard/CompressContent.java    |  35 +++++++++-
 .../standard/TestCompressContent.java           |  65 +++++++++++++++++++
 .../CompressedData/SampleFile.txt.snappy        | Bin 0 -> 1730 bytes
 .../resources/CompressedData/SampleFile.txt.sz  | Bin 0 -> 1669 bytes
 5 files changed, 102 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/32f476aa/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
index 95d140a..5494206 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
@@ -211,6 +211,11 @@ language governing permissions and limitations under the License. -->
             <scope>test</scope>
         </dependency>
         <dependency>
+            <groupId>org.xerial.snappy</groupId>
+            <artifactId>snappy-java</artifactId>
+            <version>1.1.2</version>
+        </dependency>
+        <dependency>
             <groupId>com.h2database</groupId>
             <artifactId>h2</artifactId>
             <version>1.4.187</version>

http://git-wip-us.apache.org/repos/asf/nifi/blob/32f476aa/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/CompressContent.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/CompressContent.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/CompressContent.java
index 593cf44..3ef9746 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/CompressContent.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/CompressContent.java
@@ -64,12 +64,16 @@ import org.tukaani.xz.XZOutputStream;
 import lzma.sdk.lzma.Decoder;
 import lzma.streams.LzmaInputStream;
 import lzma.streams.LzmaOutputStream;
+import org.xerial.snappy.SnappyFramedInputStream;
+import org.xerial.snappy.SnappyFramedOutputStream;
+import org.xerial.snappy.SnappyInputStream;
+import org.xerial.snappy.SnappyOutputStream;
 
 @EventDriven
 @SideEffectFree
 @SupportsBatching
 @InputRequirement(Requirement.INPUT_REQUIRED)
-@Tags({"content", "compress", "decompress", "gzip", "bzip2", "lzma", "xz-lzma2"})
+@Tags({"content", "compress", "decompress", "gzip", "bzip2", "lzma", "xz-lzma2", "snappy", "snappy framed"})
 @CapabilityDescription("Compresses or decompresses the contents of FlowFiles using a user-specified compression algorithm and updates the mime.type "
     + "attribute as appropriate")
 @ReadsAttribute(attribute = "mime.type", description = "If the Compression Format is set to use mime.type attribute, this attribute is used to "
@@ -83,14 +87,17 @@ public class CompressContent extends AbstractProcessor {
     public static final String COMPRESSION_FORMAT_BZIP2 = "bzip2";
     public static final String COMPRESSION_FORMAT_XZ_LZMA2 = "xz-lzma2";
     public static final String COMPRESSION_FORMAT_LZMA = "lzma";
+    public static final String COMPRESSION_FORMAT_SNAPPY = "snappy";
+    public static final String COMPRESSION_FORMAT_SNAPPY_FRAMED = "snappy framed";
 
     public static final String MODE_COMPRESS = "compress";
     public static final String MODE_DECOMPRESS = "decompress";
 
     public static final PropertyDescriptor COMPRESSION_FORMAT = new PropertyDescriptor.Builder()
     .name("Compression Format")
-    .description("The compression format to use. Valid values are: GZIP, BZIP2, XZ-LZMA2, and LZMA")
-    .allowableValues(COMPRESSION_FORMAT_ATTRIBUTE, COMPRESSION_FORMAT_GZIP, COMPRESSION_FORMAT_BZIP2, COMPRESSION_FORMAT_XZ_LZMA2, COMPRESSION_FORMAT_LZMA)
+    .description("The compression format to use. Valid values are: GZIP, BZIP2, XZ-LZMA2, LZMA, Snappy, and Snappy Framed")
+    .allowableValues(COMPRESSION_FORMAT_ATTRIBUTE, COMPRESSION_FORMAT_GZIP, COMPRESSION_FORMAT_BZIP2,
+            COMPRESSION_FORMAT_XZ_LZMA2, COMPRESSION_FORMAT_LZMA, COMPRESSION_FORMAT_SNAPPY, COMPRESSION_FORMAT_SNAPPY_FRAMED)
     .defaultValue(COMPRESSION_FORMAT_ATTRIBUTE)
     .required(true)
     .build();
@@ -150,6 +157,8 @@ public class CompressContent extends AbstractProcessor {
         mimeTypeMap.put("application/bzip2", COMPRESSION_FORMAT_BZIP2);
         mimeTypeMap.put("application/x-bzip2", COMPRESSION_FORMAT_BZIP2);
         mimeTypeMap.put("application/x-lzma", COMPRESSION_FORMAT_LZMA);
+        mimeTypeMap.put("application/x-snappy", COMPRESSION_FORMAT_SNAPPY);
+        mimeTypeMap.put("application/x-snappy-framed", COMPRESSION_FORMAT_SNAPPY_FRAMED);
         this.compressionFormatMimeTypeMap = Collections.unmodifiableMap(mimeTypeMap);
     }
 
@@ -210,6 +219,12 @@ public class CompressContent extends AbstractProcessor {
             case COMPRESSION_FORMAT_BZIP2:
                 fileExtension = ".bz2";
                 break;
+            case COMPRESSION_FORMAT_SNAPPY:
+                fileExtension = ".snappy";
+                break;
+            case COMPRESSION_FORMAT_SNAPPY_FRAMED:
+                fileExtension = ".sz";
+                break;
             default:
                 fileExtension = "";
                 break;
@@ -243,6 +258,14 @@ public class CompressContent extends AbstractProcessor {
                                     compressionOut = new XZOutputStream(bufferedOut, new LZMA2Options());
                                     mimeTypeRef.set("application/x-xz");
                                     break;
+                                case COMPRESSION_FORMAT_SNAPPY:
+                                    compressionOut = new SnappyOutputStream(bufferedOut);
+                                    mimeTypeRef.set("application/x-snappy");
+                                    break;
+                                case COMPRESSION_FORMAT_SNAPPY_FRAMED:
+                                    compressionOut = new SnappyFramedOutputStream(bufferedOut);
+                                    mimeTypeRef.set("application/x-snappy-framed");
+                                    break;
                                 case COMPRESSION_FORMAT_BZIP2:
                                 default:
                                     mimeTypeRef.set("application/x-bzip2");
@@ -265,6 +288,12 @@ public class CompressContent extends AbstractProcessor {
                                 case COMPRESSION_FORMAT_GZIP:
                                     compressionIn = new GzipCompressorInputStream(bufferedIn, true);
                                     break;
+                                case COMPRESSION_FORMAT_SNAPPY:
+                                    compressionIn = new SnappyInputStream(bufferedIn);
+                                    break;
+                                case COMPRESSION_FORMAT_SNAPPY_FRAMED:
+                                    compressionIn = new SnappyFramedInputStream(bufferedIn);
+                                    break;
                                 default:
                                     compressionIn = new CompressorStreamFactory().createCompressorInputStream(compressionFormat.toLowerCase(), bufferedIn);
                             }

http://git-wip-us.apache.org/repos/asf/nifi/blob/32f476aa/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestCompressContent.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestCompressContent.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestCompressContent.java
index 68cba4d..5f96036 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestCompressContent.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestCompressContent.java
@@ -23,6 +23,7 @@ import java.nio.file.Paths;
 import java.util.HashMap;
 import java.util.Map;
 
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
 import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
@@ -31,6 +32,70 @@ import org.junit.Test;
 public class TestCompressContent {
 
     @Test
+    public void testSnappyCompress() throws Exception {
+        final TestRunner runner = TestRunners.newTestRunner(CompressContent.class);
+        runner.setProperty(CompressContent.MODE, CompressContent.MODE_COMPRESS);
+        runner.setProperty(CompressContent.COMPRESSION_FORMAT, CompressContent.COMPRESSION_FORMAT_SNAPPY);
+        runner.setProperty(CompressContent.UPDATE_FILENAME, "true");
+
+        runner.enqueue(Paths.get("src/test/resources/CompressedData/SampleFile.txt"));
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(CompressContent.REL_SUCCESS, 1);
+        MockFlowFile flowFile = runner.getFlowFilesForRelationship(CompressContent.REL_SUCCESS).get(0);
+        flowFile.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "application/x-snappy");
+        flowFile.assertAttributeEquals("filename", "SampleFile.txt.snappy");
+    }
+
+    @Test
+    public void testSnappyDecompress() throws Exception {
+        final TestRunner runner = TestRunners.newTestRunner(CompressContent.class);
+        runner.setProperty(CompressContent.MODE, CompressContent.MODE_DECOMPRESS);
+        runner.setProperty(CompressContent.COMPRESSION_FORMAT, CompressContent.COMPRESSION_FORMAT_SNAPPY);
+        runner.setProperty(CompressContent.UPDATE_FILENAME, "true");
+
+        runner.enqueue(Paths.get("src/test/resources/CompressedData/SampleFile.txt.snappy"));
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(CompressContent.REL_SUCCESS, 1);
+        MockFlowFile flowFile = runner.getFlowFilesForRelationship(CompressContent.REL_SUCCESS).get(0);
+        flowFile.assertContentEquals(Paths.get("src/test/resources/CompressedData/SampleFile.txt"));
+        flowFile.assertAttributeEquals("filename", "SampleFile.txt");
+    }
+
+    @Test
+    public void testSnappyFramedCompress() throws Exception {
+        final TestRunner runner = TestRunners.newTestRunner(CompressContent.class);
+        runner.setProperty(CompressContent.MODE, CompressContent.MODE_COMPRESS);
+        runner.setProperty(CompressContent.COMPRESSION_FORMAT, CompressContent.COMPRESSION_FORMAT_SNAPPY_FRAMED);
+        runner.setProperty(CompressContent.UPDATE_FILENAME, "true");
+
+        runner.enqueue(Paths.get("src/test/resources/CompressedData/SampleFile.txt"));
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(CompressContent.REL_SUCCESS, 1);
+        MockFlowFile flowFile = runner.getFlowFilesForRelationship(CompressContent.REL_SUCCESS).get(0);
+        flowFile.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "application/x-snappy-framed");
+        flowFile.assertAttributeEquals("filename", "SampleFile.txt.sz");
+    }
+
+    @Test
+    public void testSnappyFramedDecompress() throws Exception {
+        final TestRunner runner = TestRunners.newTestRunner(CompressContent.class);
+        runner.setProperty(CompressContent.MODE, CompressContent.MODE_DECOMPRESS);
+        runner.setProperty(CompressContent.COMPRESSION_FORMAT, CompressContent.COMPRESSION_FORMAT_SNAPPY_FRAMED);
+        runner.setProperty(CompressContent.UPDATE_FILENAME, "true");
+
+        runner.enqueue(Paths.get("src/test/resources/CompressedData/SampleFile.txt.sz"));
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(CompressContent.REL_SUCCESS, 1);
+        MockFlowFile flowFile = runner.getFlowFilesForRelationship(CompressContent.REL_SUCCESS).get(0);
+        flowFile.assertContentEquals(Paths.get("src/test/resources/CompressedData/SampleFile.txt"));
+        flowFile.assertAttributeEquals("filename", "SampleFile.txt");
+    }
+
+    @Test
     public void testBzip2DecompressConcatenated() throws Exception {
         final TestRunner runner = TestRunners.newTestRunner(CompressContent.class);
         runner.setProperty(CompressContent.MODE, "decompress");

http://git-wip-us.apache.org/repos/asf/nifi/blob/32f476aa/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/CompressedData/SampleFile.txt.snappy
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/CompressedData/SampleFile.txt.snappy b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/CompressedData/SampleFile.txt.snappy
new file mode 100644
index 0000000..60c384a
Binary files /dev/null and b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/CompressedData/SampleFile.txt.snappy differ

http://git-wip-us.apache.org/repos/asf/nifi/blob/32f476aa/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/CompressedData/SampleFile.txt.sz
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/CompressedData/SampleFile.txt.sz b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/CompressedData/SampleFile.txt.sz
new file mode 100644
index 0000000..1065381
Binary files /dev/null and b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/CompressedData/SampleFile.txt.sz differ