You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2020/09/07 23:55:07 UTC
[nifi] 01/02: NIFI-7506 add snappy-hadoop to CompressContent
This is an automated email from the ASF dual-hosted git repository.
joewitt pushed a commit to branch support/nifi-1.12.x
in repository https://gitbox.apache.org/repos/asf/nifi.git
commit d18c52041504bddddd6d4efeb05d2fd1b7918709
Author: noedetore <no...@detorit.biz>
AuthorDate: Mon Aug 31 12:24:34 2020 -0400
NIFI-7506 add snappy-hadoop to CompressContent
Signed-off-by: Pierre Villard <pi...@gmail.com>
This closes #4321.
---
.../nifi/processors/standard/CompressContent.java | 39 ++++++++++++++++++---
.../processors/standard/TestCompressContent.java | 40 ++++++++++++++++++----
2 files changed, 68 insertions(+), 11 deletions(-)
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/CompressContent.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/CompressContent.java
index 8c7693b..2fdb4ba 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/CompressContent.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/CompressContent.java
@@ -23,6 +23,7 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -51,6 +52,9 @@ import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.Validator;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.logging.ComponentLog;
@@ -68,6 +72,7 @@ import org.tukaani.xz.XZInputStream;
import org.tukaani.xz.XZOutputStream;
import org.xerial.snappy.SnappyFramedInputStream;
import org.xerial.snappy.SnappyFramedOutputStream;
+import org.xerial.snappy.SnappyHadoopCompatibleOutputStream;
import org.xerial.snappy.SnappyInputStream;
import org.xerial.snappy.SnappyOutputStream;
@@ -79,7 +84,7 @@ import lzma.streams.LzmaOutputStream;
@SideEffectFree
@SupportsBatching
@InputRequirement(Requirement.INPUT_REQUIRED)
-@Tags({"content", "compress", "decompress", "gzip", "bzip2", "lzma", "xz-lzma2", "snappy", "snappy framed", "lz4-framed", "deflate"})
+@Tags({"content", "compress", "decompress", "gzip", "bzip2", "lzma", "xz-lzma2", "snappy", "snappy-hadoop", "snappy framed", "lz4-framed", "deflate"})
@CapabilityDescription("Compresses or decompresses the contents of FlowFiles using a user-specified compression algorithm and updates the mime.type "
+ "attribute as appropriate. This processor operates in a very memory efficient way so very large objects well beyond the heap size "
+ "are generally fine to process")
@@ -98,6 +103,7 @@ public class CompressContent extends AbstractProcessor {
public static final String COMPRESSION_FORMAT_XZ_LZMA2 = "xz-lzma2";
public static final String COMPRESSION_FORMAT_LZMA = "lzma";
public static final String COMPRESSION_FORMAT_SNAPPY = "snappy";
+ public static final String COMPRESSION_FORMAT_SNAPPY_HADOOP = "snappy-hadoop";
public static final String COMPRESSION_FORMAT_SNAPPY_FRAMED = "snappy framed";
public static final String COMPRESSION_FORMAT_LZ4_FRAMED ="lz4-framed";
@@ -106,9 +112,9 @@ public class CompressContent extends AbstractProcessor {
public static final PropertyDescriptor COMPRESSION_FORMAT = new PropertyDescriptor.Builder()
.name("Compression Format")
- .description("The compression format to use. Valid values are: GZIP, Deflate, BZIP2, XZ-LZMA2, LZMA, Snappy, Snappy Framed, and LZ4-Framed")
+ .description("The compression format to use. Valid values are: GZIP, Deflate, BZIP2, XZ-LZMA2, LZMA, Snappy, Snappy Hadoop, Snappy Framed, and LZ4-Framed")
.allowableValues(COMPRESSION_FORMAT_ATTRIBUTE, COMPRESSION_FORMAT_GZIP, COMPRESSION_FORMAT_DEFLATE, COMPRESSION_FORMAT_BZIP2,
- COMPRESSION_FORMAT_XZ_LZMA2, COMPRESSION_FORMAT_LZMA, COMPRESSION_FORMAT_SNAPPY, COMPRESSION_FORMAT_SNAPPY_FRAMED,
+ COMPRESSION_FORMAT_XZ_LZMA2, COMPRESSION_FORMAT_LZMA, COMPRESSION_FORMAT_SNAPPY, COMPRESSION_FORMAT_SNAPPY_HADOOP, COMPRESSION_FORMAT_SNAPPY_FRAMED,
COMPRESSION_FORMAT_LZ4_FRAMED)
.defaultValue(COMPRESSION_FORMAT_ATTRIBUTE)
.required(true)
@@ -124,7 +130,7 @@ public class CompressContent extends AbstractProcessor {
.build();
public static final PropertyDescriptor MODE = new PropertyDescriptor.Builder()
.name("Mode")
- .description("Indicates whether the processor should compress content or decompress content. Must be either 'compress' or 'decompress'")
+ .description("Indicates whether the processor should compress content or decompress content. Must be either 'compress' or 'decompress'.")
.allowableValues(MODE_COMPRESS, MODE_DECOMPRESS)
.defaultValue(MODE_COMPRESS)
.required(true)
@@ -174,6 +180,7 @@ public class CompressContent extends AbstractProcessor {
mimeTypeMap.put("application/x-bzip2", COMPRESSION_FORMAT_BZIP2);
mimeTypeMap.put("application/x-lzma", COMPRESSION_FORMAT_LZMA);
mimeTypeMap.put("application/x-snappy", COMPRESSION_FORMAT_SNAPPY);
+ mimeTypeMap.put("application/x-snappy-hadoop", COMPRESSION_FORMAT_SNAPPY_HADOOP);
mimeTypeMap.put("application/x-snappy-framed", COMPRESSION_FORMAT_SNAPPY_FRAMED);
mimeTypeMap.put("application/x-lz4-framed", COMPRESSION_FORMAT_LZ4_FRAMED);
this.compressionFormatMimeTypeMap = Collections.unmodifiableMap(mimeTypeMap);
@@ -190,6 +197,21 @@ public class CompressContent extends AbstractProcessor {
}
@Override
+ protected Collection<ValidationResult> customValidate(final ValidationContext context) {
+ final List<ValidationResult> validationResults = new ArrayList<>(super.customValidate(context));
+
+ final Validator rateValidator;
+ if (context.getProperty(COMPRESSION_FORMAT).getValue().toLowerCase().equals(COMPRESSION_FORMAT_SNAPPY_HADOOP)
+ && context.getProperty(MODE).getValue().toLowerCase().equals(MODE_DECOMPRESS)) {
+ validationResults.add(new ValidationResult.Builder().subject(COMPRESSION_FORMAT.getName())
+ .explanation("<Compression Format> set to <snappy-hadoop> and <MODE> set to <decompress> is not permitted. " +
+ "Data that is compressed with Snappy Hadoop can not be decompressed using this processor.")
+ .build());
+ }
+ return validationResults;
+ }
+
+ @Override
public void onTrigger(final ProcessContext context, final ProcessSession session) {
FlowFile flowFile = session.get();
if (flowFile == null) {
@@ -242,6 +264,9 @@ public class CompressContent extends AbstractProcessor {
case COMPRESSION_FORMAT_SNAPPY:
fileExtension = ".snappy";
break;
+ case COMPRESSION_FORMAT_SNAPPY_HADOOP:
+ fileExtension = ".snappy";
+ break;
case COMPRESSION_FORMAT_SNAPPY_FRAMED:
fileExtension = ".sz";
break;
@@ -291,6 +316,10 @@ public class CompressContent extends AbstractProcessor {
compressionOut = new SnappyOutputStream(bufferedOut);
mimeTypeRef.set("application/x-snappy");
break;
+ case COMPRESSION_FORMAT_SNAPPY_HADOOP:
+ compressionOut = new SnappyHadoopCompatibleOutputStream(bufferedOut);
+ mimeTypeRef.set("application/x-snappy-hadoop");
+ break;
case COMPRESSION_FORMAT_SNAPPY_FRAMED:
compressionOut = new SnappyFramedOutputStream(bufferedOut);
mimeTypeRef.set("application/x-snappy-framed");
@@ -327,6 +356,8 @@ public class CompressContent extends AbstractProcessor {
case COMPRESSION_FORMAT_SNAPPY:
compressionIn = new SnappyInputStream(bufferedIn);
break;
+ case COMPRESSION_FORMAT_SNAPPY_HADOOP:
+ throw new Exception("Cannot decompress snappy-hadoop.");
case COMPRESSION_FORMAT_SNAPPY_FRAMED:
compressionIn = new SnappyFramedInputStream(bufferedIn);
break;
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestCompressContent.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestCompressContent.java
index 816e307..a87581d 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestCompressContent.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestCompressContent.java
@@ -16,18 +16,18 @@
*/
package org.apache.nifi.processors.standard;
-import static org.junit.Assert.assertTrue;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Test;
import java.io.IOException;
import java.nio.file.Paths;
import java.util.HashMap;
import java.util.Map;
-import org.apache.nifi.flowfile.attributes.CoreAttributes;
-import org.apache.nifi.util.MockFlowFile;
-import org.apache.nifi.util.TestRunner;
-import org.apache.nifi.util.TestRunners;
-import org.junit.Test;
+import static org.junit.Assert.assertTrue;
public class TestCompressContent {
@@ -64,6 +64,32 @@ public class TestCompressContent {
}
@Test
+ public void testSnappyHadoopCompress() throws Exception {
+ final TestRunner runner = TestRunners.newTestRunner(CompressContent.class);
+ runner.setProperty(CompressContent.MODE, CompressContent.MODE_COMPRESS);
+ runner.setProperty(CompressContent.COMPRESSION_FORMAT, CompressContent.COMPRESSION_FORMAT_SNAPPY_HADOOP);
+ runner.setProperty(CompressContent.UPDATE_FILENAME, "true");
+
+ runner.enqueue(Paths.get("src/test/resources/CompressedData/SampleFile.txt"));
+ runner.run();
+
+ runner.assertAllFlowFilesTransferred(CompressContent.REL_SUCCESS, 1);
+ MockFlowFile flowFile = runner.getFlowFilesForRelationship(CompressContent.REL_SUCCESS).get(0);
+ flowFile.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "application/x-snappy-hadoop");
+ flowFile.assertAttributeEquals("filename", "SampleFile.txt.snappy");
+ }
+
+ @Test
+ public void testSnappyHadoopDecompress() {
+ final TestRunner runner = TestRunners.newTestRunner(CompressContent.class);
+ runner.setProperty(CompressContent.MODE, CompressContent.MODE_DECOMPRESS);
+ runner.setProperty(CompressContent.COMPRESSION_FORMAT, CompressContent.COMPRESSION_FORMAT_SNAPPY_HADOOP);
+ runner.setProperty(CompressContent.UPDATE_FILENAME, "true");
+
+ runner.assertNotValid();
+ }
+
+ @Test
public void testSnappyFramedCompress() throws Exception {
final TestRunner runner = TestRunners.newTestRunner(CompressContent.class);
runner.setProperty(CompressContent.MODE, CompressContent.MODE_COMPRESS);
@@ -210,7 +236,7 @@ public class TestCompressContent {
runner.clearTransferState();
runner.setProperty(CompressContent.COMPRESSION_FORMAT, CompressContent.COMPRESSION_FORMAT_ATTRIBUTE);
- Map<String,String> attributes = new HashMap<String,String>();
+ Map<String, String> attributes = new HashMap<>();
attributes.put(CoreAttributes.MIME_TYPE.key(), "application/x-gzip");
runner.enqueue(Paths.get("src/test/resources/CompressedData/SampleFile.txt.gz"), attributes);
runner.run();