You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by af...@apache.org on 2017/03/02 11:42:15 UTC
nifi git commit: NIFI-1797 - Added compression codec property to
CreateHadoopSequenceFile processor
Repository: nifi
Updated Branches:
refs/heads/master a32e1509b -> dcdfd3dad
NIFI-1797 - Added compression codec property to CreateHadoopSequenceFile processor
This closes: #1387
Signed-off-by: Andre F de Miranda <tr...@users.noreply.github.com>
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/dcdfd3da
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/dcdfd3da
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/dcdfd3da
Branch: refs/heads/master
Commit: dcdfd3dad9f02ec8d85a3e1da2f82c30e20b6dae
Parents: a32e150
Author: Pierre Villard <pi...@gmail.com>
Authored: Tue Jan 3 20:47:56 2017 +0100
Committer: Andre F de Miranda <tr...@users.noreply.github.com>
Committed: Thu Mar 2 22:19:34 2017 +1100
----------------------------------------------------------------------
.../hadoop/CreateHadoopSequenceFile.java | 28 +++-
.../hadoop/SequenceFileWriterImpl.java | 8 +-
.../hadoop/util/SequenceFileWriter.java | 4 +-
.../hadoop/TestCreateHadoopSequenceFile.java | 149 ++++++++++++++++++-
4 files changed, 176 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/dcdfd3da/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/CreateHadoopSequenceFile.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/CreateHadoopSequenceFile.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/CreateHadoopSequenceFile.java
index 7770152..20da921 100644
--- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/CreateHadoopSequenceFile.java
+++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/CreateHadoopSequenceFile.java
@@ -16,7 +16,9 @@
*/
package org.apache.nifi.processors.hadoop;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.SideEffectFree;
@@ -31,12 +33,14 @@ import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processors.hadoop.util.SequenceFileWriter;
+import org.apache.nifi.util.StopWatch;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
+import java.util.concurrent.TimeUnit;
/**
* <p>
@@ -88,7 +92,7 @@ public class CreateHadoopSequenceFile extends AbstractHadoopProcessor {
}
// Optional Properties.
static final PropertyDescriptor COMPRESSION_TYPE = new PropertyDescriptor.Builder()
- .name("compression type")
+ .name("Compression type")
.description("Type of compression to use when creating Sequence File")
.allowableValues(SequenceFile.CompressionType.values())
.build();
@@ -105,6 +109,7 @@ public class CreateHadoopSequenceFile extends AbstractHadoopProcessor {
public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
List<PropertyDescriptor> someProps = new ArrayList<>(properties);
someProps.add(COMPRESSION_TYPE);
+ someProps.add(COMPRESSION_CODEC);
return someProps;
}
@@ -149,13 +154,28 @@ public class CreateHadoopSequenceFile extends AbstractHadoopProcessor {
default:
sequenceFileWriter = new SequenceFileWriterImpl();
}
- String value = context.getProperty(COMPRESSION_TYPE).getValue();
- SequenceFile.CompressionType compressionType = value == null
+
+ final Configuration configuration = getConfiguration();
+ if (configuration == null) {
+ getLogger().error("HDFS not configured properly");
+ session.transfer(flowFile, RELATIONSHIP_FAILURE);
+ context.yield();
+ return;
+ }
+
+ final CompressionCodec codec = getCompressionCodec(context, configuration);
+
+ final String value = context.getProperty(COMPRESSION_TYPE).getValue();
+ final SequenceFile.CompressionType compressionType = value == null
? SequenceFile.CompressionType.valueOf(DEFAULT_COMPRESSION_TYPE) : SequenceFile.CompressionType.valueOf(value);
+
final String fileName = flowFile.getAttribute(CoreAttributes.FILENAME.key()) + ".sf";
flowFile = session.putAttribute(flowFile, CoreAttributes.FILENAME.key(), fileName);
+
try {
- flowFile = sequenceFileWriter.writeSequenceFile(flowFile, session, getConfiguration(), compressionType);
+ StopWatch stopWatch = new StopWatch(true);
+ flowFile = sequenceFileWriter.writeSequenceFile(flowFile, session, configuration, compressionType, codec);
+ session.getProvenanceReporter().modifyContent(flowFile, stopWatch.getElapsed(TimeUnit.MILLISECONDS));
session.transfer(flowFile, RELATIONSHIP_SUCCESS);
getLogger().info("Transferred flowfile {} to {}", new Object[]{flowFile, RELATIONSHIP_SUCCESS});
} catch (ProcessException e) {
http://git-wip-us.apache.org/repos/asf/nifi/blob/dcdfd3da/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/SequenceFileWriterImpl.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/SequenceFileWriterImpl.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/SequenceFileWriterImpl.java
index 2c586e0..f794c3b 100644
--- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/SequenceFileWriterImpl.java
+++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/SequenceFileWriterImpl.java
@@ -24,7 +24,7 @@ import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.io.SequenceFile.Writer;
import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.compress.DefaultCodec;
+import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.ProcessSession;
@@ -32,11 +32,11 @@ import org.apache.nifi.processor.io.StreamCallback;
import org.apache.nifi.processors.hadoop.util.ByteFilteringOutputStream;
import org.apache.nifi.processors.hadoop.util.InputStreamWritable;
import org.apache.nifi.processors.hadoop.util.SequenceFileWriter;
-import org.apache.nifi.stream.io.BufferedInputStream;
import org.apache.nifi.util.StopWatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.BufferedInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
@@ -48,7 +48,7 @@ public class SequenceFileWriterImpl implements SequenceFileWriter {
@Override
public FlowFile writeSequenceFile(final FlowFile flowFile, final ProcessSession session,
- final Configuration configuration, final CompressionType compressionType) {
+ final Configuration configuration, final CompressionType compressionType, final CompressionCodec compressionCodec) {
if (flowFile.getSize() > Integer.MAX_VALUE) {
throw new IllegalArgumentException("Cannot write " + flowFile
@@ -97,7 +97,7 @@ public class SequenceFileWriterImpl implements SequenceFileWriter {
SequenceFile.Writer.stream(fsDataOutputStream),
SequenceFile.Writer.keyClass(Text.class),
SequenceFile.Writer.valueClass(InputStreamWritable.class),
- SequenceFile.Writer.compression(compressionType, new DefaultCodec()))) {
+ SequenceFile.Writer.compression(compressionType, compressionCodec))) {
processInputStream(in, flowFile, writer);
http://git-wip-us.apache.org/repos/asf/nifi/blob/dcdfd3da/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/SequenceFileWriter.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/SequenceFileWriter.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/SequenceFileWriter.java
index 6ad6461..d19e3b6 100644
--- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/SequenceFileWriter.java
+++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/SequenceFileWriter.java
@@ -18,6 +18,7 @@ package org.apache.nifi.processors.hadoop.util;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessSession;
@@ -31,7 +32,8 @@ public interface SequenceFileWriter {
* @param session session
* @param configuration configuration
* @param compressionType compression type
+ * @param compressionCodec compression codec
* @return the written to SequenceFile flow file
*/
- FlowFile writeSequenceFile(FlowFile flowFile, ProcessSession session, Configuration configuration, CompressionType compressionType);
+ FlowFile writeSequenceFile(FlowFile flowFile, ProcessSession session, Configuration configuration, CompressionType compressionType, CompressionCodec compressionCodec);
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/dcdfd3da/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestCreateHadoopSequenceFile.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestCreateHadoopSequenceFile.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestCreateHadoopSequenceFile.java
index 419323f..1ac62af 100644
--- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestCreateHadoopSequenceFile.java
+++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestCreateHadoopSequenceFile.java
@@ -17,7 +17,10 @@
package org.apache.nifi.processors.hadoop;
import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.compress.BZip2Codec;
+import org.apache.hadoop.io.compress.DefaultCodec;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
@@ -30,8 +33,6 @@ import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.FileInputStream;
@@ -49,7 +50,6 @@ import static org.mockito.Mockito.when;
public class TestCreateHadoopSequenceFile {
private TestRunner controller;
- private static Logger LOGGER;
private final File testdata = new File("src/test/resources/testdata");
private final File[] inFiles = new File[]{new File(testdata, "randombytes-1"),
@@ -61,7 +61,6 @@ public class TestCreateHadoopSequenceFile {
@BeforeClass
public static void setUpClass() {
- LOGGER = LoggerFactory.getLogger(TestCreateHadoopSequenceFile.class);
System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", "info");
System.setProperty("org.slf4j.simpleLogger.log.nifi.processors.hadoop", "debug");
}
@@ -204,6 +203,147 @@ public class TestCreateHadoopSequenceFile {
// fos.close();
}
+ @Test
+ public void testSequenceFileBzipCompressionCodec() throws UnsupportedEncodingException, IOException {
+
+ controller.setProperty(AbstractHadoopProcessor.COMPRESSION_CODEC, AbstractHadoopProcessor.CompressionType.BZIP.name());
+ controller.setProperty(CreateHadoopSequenceFile.COMPRESSION_TYPE, SequenceFile.CompressionType.BLOCK.name());
+
+ File inFile = inFiles[0];
+ try (FileInputStream fin = new FileInputStream(inFile) ){
+ controller.enqueue(fin);
+ }
+ controller.run();
+
+ List<MockFlowFile> successSeqFiles = controller.getFlowFilesForRelationship(CreateHadoopSequenceFile.RELATIONSHIP_SUCCESS);
+ List<MockFlowFile> failedFlowFiles = controller.getFlowFilesForRelationship(CreateHadoopSequenceFile.RELATIONSHIP_FAILURE);
+
+ assertEquals(0, failedFlowFiles.size());
+ assertEquals(1, successSeqFiles.size());
+
+ MockFlowFile ff = successSeqFiles.iterator().next();
+ byte[] data = ff.toByteArray();
+
+
+ final String magicHeader = new String(data, 0, 3, "UTF-8");
+ assertEquals("SEQ", magicHeader);
+ // Format of header is SEQ followed by the version (1 byte).
+ // Then, the length of the Key type (1 byte), then the Key type
+ // Then, the length of the Value type(1 byte), then the Value type
+ final String keyType = Text.class.getCanonicalName();
+ final int valueTypeStart = 3 + 1 + 1 + keyType.length() + 1;
+ final int valueTypeLength = data[5 + keyType.length()];
+ final String valueType = BytesWritable.class.getCanonicalName();
+
+ assertEquals(valueType.length(), valueTypeLength);
+ assertEquals(valueType, new String(data, valueTypeStart, valueType.length(), "UTF-8"));
+
+ final int compressionIndex = 3 + 1 + 1 + keyType.length() + 1 + valueType.length();
+ final int blockCompressionIndex = compressionIndex + 1;
+
+ assertEquals(1, data[compressionIndex]);
+ assertEquals(1, data[blockCompressionIndex]);
+
+ final int codecTypeSize = data[blockCompressionIndex + 1];
+ final int codecTypeStartIndex = blockCompressionIndex + 2;
+
+ assertEquals(BZip2Codec.class.getCanonicalName(), new String(data, codecTypeStartIndex, codecTypeSize, "UTF-8"));
+ }
+
+ @Test
+ public void testSequenceFileDefaultCompressionCodec() throws UnsupportedEncodingException, IOException {
+
+ controller.setProperty(AbstractHadoopProcessor.COMPRESSION_CODEC, AbstractHadoopProcessor.CompressionType.DEFAULT.name());
+ controller.setProperty(CreateHadoopSequenceFile.COMPRESSION_TYPE, SequenceFile.CompressionType.BLOCK.name());
+
+ File inFile = inFiles[0];
+ try (FileInputStream fin = new FileInputStream(inFile) ){
+ controller.enqueue(fin);
+ }
+ controller.run();
+
+ List<MockFlowFile> successSeqFiles = controller.getFlowFilesForRelationship(CreateHadoopSequenceFile.RELATIONSHIP_SUCCESS);
+ List<MockFlowFile> failedFlowFiles = controller.getFlowFilesForRelationship(CreateHadoopSequenceFile.RELATIONSHIP_FAILURE);
+
+ assertEquals(0, failedFlowFiles.size());
+ assertEquals(1, successSeqFiles.size());
+
+ MockFlowFile ff = successSeqFiles.iterator().next();
+ byte[] data = ff.toByteArray();
+
+
+ final String magicHeader = new String(data, 0, 3, "UTF-8");
+ assertEquals("SEQ", magicHeader);
+ // Format of header is SEQ followed by the version (1 byte).
+ // Then, the length of the Key type (1 byte), then the Key type
+ // Then, the length of the Value type(1 byte), then the Value type
+ final String keyType = Text.class.getCanonicalName();
+ final int valueTypeStart = 3 + 1 + 1 + keyType.length() + 1;
+ final int valueTypeLength = data[5 + keyType.length()];
+ final String valueType = BytesWritable.class.getCanonicalName();
+
+ assertEquals(valueType.length(), valueTypeLength);
+ assertEquals(valueType, new String(data, valueTypeStart, valueType.length(), "UTF-8"));
+
+ final int compressionIndex = 3 + 1 + 1 + keyType.length() + 1 + valueType.length();
+ final int blockCompressionIndex = compressionIndex + 1;
+
+ assertEquals(1, data[compressionIndex]);
+ assertEquals(1, data[blockCompressionIndex]);
+
+ final int codecTypeSize = data[blockCompressionIndex + 1];
+ final int codecTypeStartIndex = blockCompressionIndex + 2;
+
+ assertEquals(DefaultCodec.class.getCanonicalName(), new String(data, codecTypeStartIndex, codecTypeSize, "UTF-8"));
+ }
+
+ @Test
+ public void testSequenceFileNoneCompressionCodec() throws UnsupportedEncodingException, IOException {
+
+ controller.setProperty(AbstractHadoopProcessor.COMPRESSION_CODEC, AbstractHadoopProcessor.CompressionType.NONE.name());
+ controller.setProperty(CreateHadoopSequenceFile.COMPRESSION_TYPE, SequenceFile.CompressionType.BLOCK.name());
+
+ File inFile = inFiles[0];
+ try (FileInputStream fin = new FileInputStream(inFile) ){
+ controller.enqueue(fin);
+ }
+ controller.run();
+
+ List<MockFlowFile> successSeqFiles = controller.getFlowFilesForRelationship(CreateHadoopSequenceFile.RELATIONSHIP_SUCCESS);
+ List<MockFlowFile> failedFlowFiles = controller.getFlowFilesForRelationship(CreateHadoopSequenceFile.RELATIONSHIP_FAILURE);
+
+ assertEquals(0, failedFlowFiles.size());
+ assertEquals(1, successSeqFiles.size());
+
+ MockFlowFile ff = successSeqFiles.iterator().next();
+ byte[] data = ff.toByteArray();
+
+
+ final String magicHeader = new String(data, 0, 3, "UTF-8");
+ assertEquals("SEQ", magicHeader);
+ // Format of header is SEQ followed by the version (1 byte).
+ // Then, the length of the Key type (1 byte), then the Key type
+ // Then, the length of the Value type(1 byte), then the Value type
+ final String keyType = Text.class.getCanonicalName();
+ final int valueTypeStart = 3 + 1 + 1 + keyType.length() + 1;
+ final int valueTypeLength = data[5 + keyType.length()];
+ final String valueType = BytesWritable.class.getCanonicalName();
+
+ assertEquals(valueType.length(), valueTypeLength);
+ assertEquals(valueType, new String(data, valueTypeStart, valueType.length(), "UTF-8"));
+
+ final int compressionIndex = 3 + 1 + 1 + keyType.length() + 1 + valueType.length();
+ final int blockCompressionIndex = compressionIndex + 1;
+
+ assertEquals(1, data[compressionIndex]);
+ assertEquals(1, data[blockCompressionIndex]);
+
+ final int codecTypeSize = data[blockCompressionIndex + 1];
+ final int codecTypeStartIndex = blockCompressionIndex + 2;
+
+ assertEquals(DefaultCodec.class.getCanonicalName(), new String(data, codecTypeStartIndex, codecTypeSize, "UTF-8"));
+ }
+
private static class TestableCreateHadoopSequenceFile extends CreateHadoopSequenceFile {
private KerberosProperties testKerbersProperties;
@@ -217,4 +357,5 @@ public class TestCreateHadoopSequenceFile {
return testKerbersProperties;
}
}
+
}