You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by af...@apache.org on 2017/03/02 11:42:15 UTC

nifi git commit: NIFI-1797 - Added compression codec property to CreateHadoopSequenceFile processor

Repository: nifi
Updated Branches:
  refs/heads/master a32e1509b -> dcdfd3dad


NIFI-1797 - Added compression codec property to CreateHadoopSequenceFile processor

This closes: #1387

Signed-off-by: Andre F de Miranda <tr...@users.noreply.github.com>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/dcdfd3da
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/dcdfd3da
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/dcdfd3da

Branch: refs/heads/master
Commit: dcdfd3dad9f02ec8d85a3e1da2f82c30e20b6dae
Parents: a32e150
Author: Pierre Villard <pi...@gmail.com>
Authored: Tue Jan 3 20:47:56 2017 +0100
Committer: Andre F de Miranda <tr...@users.noreply.github.com>
Committed: Thu Mar 2 22:19:34 2017 +1100

----------------------------------------------------------------------
 .../hadoop/CreateHadoopSequenceFile.java        |  28 +++-
 .../hadoop/SequenceFileWriterImpl.java          |   8 +-
 .../hadoop/util/SequenceFileWriter.java         |   4 +-
 .../hadoop/TestCreateHadoopSequenceFile.java    | 149 ++++++++++++++++++-
 4 files changed, 176 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/dcdfd3da/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/CreateHadoopSequenceFile.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/CreateHadoopSequenceFile.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/CreateHadoopSequenceFile.java
index 7770152..20da921 100644
--- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/CreateHadoopSequenceFile.java
+++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/CreateHadoopSequenceFile.java
@@ -16,7 +16,9 @@
  */
 package org.apache.nifi.processors.hadoop;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
 import org.apache.nifi.annotation.behavior.SideEffectFree;
@@ -31,12 +33,14 @@ import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processors.hadoop.util.SequenceFileWriter;
+import org.apache.nifi.util.StopWatch;
 
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
 
 /**
  * <p>
@@ -88,7 +92,7 @@ public class CreateHadoopSequenceFile extends AbstractHadoopProcessor {
     }
     // Optional Properties.
     static final PropertyDescriptor COMPRESSION_TYPE = new PropertyDescriptor.Builder()
-            .name("compression type")
+            .name("Compression type")
             .description("Type of compression to use when creating Sequence File")
             .allowableValues(SequenceFile.CompressionType.values())
             .build();
@@ -105,6 +109,7 @@ public class CreateHadoopSequenceFile extends AbstractHadoopProcessor {
     public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
         List<PropertyDescriptor> someProps = new ArrayList<>(properties);
         someProps.add(COMPRESSION_TYPE);
+        someProps.add(COMPRESSION_CODEC);
         return  someProps;
     }
 
@@ -149,13 +154,28 @@ public class CreateHadoopSequenceFile extends AbstractHadoopProcessor {
             default:
                 sequenceFileWriter = new SequenceFileWriterImpl();
         }
-        String value = context.getProperty(COMPRESSION_TYPE).getValue();
-        SequenceFile.CompressionType compressionType = value == null
+
+        final Configuration configuration = getConfiguration();
+        if (configuration == null) {
+            getLogger().error("HDFS not configured properly");
+            session.transfer(flowFile, RELATIONSHIP_FAILURE);
+            context.yield();
+            return;
+        }
+
+        final CompressionCodec codec = getCompressionCodec(context, configuration);
+
+        final String value = context.getProperty(COMPRESSION_TYPE).getValue();
+        final SequenceFile.CompressionType compressionType = value == null
             ? SequenceFile.CompressionType.valueOf(DEFAULT_COMPRESSION_TYPE) : SequenceFile.CompressionType.valueOf(value);
+
         final String fileName = flowFile.getAttribute(CoreAttributes.FILENAME.key()) + ".sf";
         flowFile = session.putAttribute(flowFile, CoreAttributes.FILENAME.key(), fileName);
+
         try {
-            flowFile = sequenceFileWriter.writeSequenceFile(flowFile, session, getConfiguration(), compressionType);
+            StopWatch stopWatch = new StopWatch(true);
+            flowFile = sequenceFileWriter.writeSequenceFile(flowFile, session, configuration, compressionType, codec);
+            session.getProvenanceReporter().modifyContent(flowFile, stopWatch.getElapsed(TimeUnit.MILLISECONDS));
             session.transfer(flowFile, RELATIONSHIP_SUCCESS);
             getLogger().info("Transferred flowfile {} to {}", new Object[]{flowFile, RELATIONSHIP_SUCCESS});
         } catch (ProcessException e) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/dcdfd3da/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/SequenceFileWriterImpl.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/SequenceFileWriterImpl.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/SequenceFileWriterImpl.java
index 2c586e0..f794c3b 100644
--- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/SequenceFileWriterImpl.java
+++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/SequenceFileWriterImpl.java
@@ -24,7 +24,7 @@ import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.SequenceFile.CompressionType;
 import org.apache.hadoop.io.SequenceFile.Writer;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.compress.DefaultCodec;
+import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
 import org.apache.nifi.processor.ProcessSession;
@@ -32,11 +32,11 @@ import org.apache.nifi.processor.io.StreamCallback;
 import org.apache.nifi.processors.hadoop.util.ByteFilteringOutputStream;
 import org.apache.nifi.processors.hadoop.util.InputStreamWritable;
 import org.apache.nifi.processors.hadoop.util.SequenceFileWriter;
-import org.apache.nifi.stream.io.BufferedInputStream;
 import org.apache.nifi.util.StopWatch;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.BufferedInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
@@ -48,7 +48,7 @@ public class SequenceFileWriterImpl implements SequenceFileWriter {
 
     @Override
     public FlowFile writeSequenceFile(final FlowFile flowFile, final ProcessSession session,
-            final Configuration configuration, final CompressionType compressionType) {
+            final Configuration configuration, final CompressionType compressionType, final CompressionCodec compressionCodec) {
 
         if (flowFile.getSize() > Integer.MAX_VALUE) {
             throw new IllegalArgumentException("Cannot write " + flowFile
@@ -97,7 +97,7 @@ public class SequenceFileWriterImpl implements SequenceFileWriter {
                                 SequenceFile.Writer.stream(fsDataOutputStream),
                                 SequenceFile.Writer.keyClass(Text.class),
                                 SequenceFile.Writer.valueClass(InputStreamWritable.class),
-                                SequenceFile.Writer.compression(compressionType, new DefaultCodec()))) {
+                                SequenceFile.Writer.compression(compressionType, compressionCodec))) {
 
                     processInputStream(in, flowFile, writer);
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/dcdfd3da/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/SequenceFileWriter.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/SequenceFileWriter.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/SequenceFileWriter.java
index 6ad6461..d19e3b6 100644
--- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/SequenceFileWriter.java
+++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/SequenceFileWriter.java
@@ -18,6 +18,7 @@ package org.apache.nifi.processors.hadoop.util;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.processor.ProcessSession;
 
@@ -31,7 +32,8 @@ public interface SequenceFileWriter {
      * @param session session
      * @param configuration configuration
      * @param compressionType compression type
+     * @param compressionCodec compression codec
      * @return the written to SequenceFile flow file
      */
-    FlowFile writeSequenceFile(FlowFile flowFile, ProcessSession session, Configuration configuration, CompressionType compressionType);
+    FlowFile writeSequenceFile(FlowFile flowFile, ProcessSession session, Configuration configuration, CompressionType compressionType, CompressionCodec compressionCodec);
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/dcdfd3da/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestCreateHadoopSequenceFile.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestCreateHadoopSequenceFile.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestCreateHadoopSequenceFile.java
index 419323f..1ac62af 100644
--- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestCreateHadoopSequenceFile.java
+++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestCreateHadoopSequenceFile.java
@@ -17,7 +17,10 @@
 package org.apache.nifi.processors.hadoop;
 
 import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.compress.BZip2Codec;
+import org.apache.hadoop.io.compress.DefaultCodec;
 import org.apache.nifi.components.AllowableValue;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
@@ -30,8 +33,6 @@ import org.junit.After;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.FileInputStream;
@@ -49,7 +50,6 @@ import static org.mockito.Mockito.when;
 public class TestCreateHadoopSequenceFile {
 
     private TestRunner controller;
-    private static Logger LOGGER;
 
     private final File testdata = new File("src/test/resources/testdata");
     private final File[] inFiles = new File[]{new File(testdata, "randombytes-1"),
@@ -61,7 +61,6 @@ public class TestCreateHadoopSequenceFile {
 
     @BeforeClass
     public static void setUpClass() {
-        LOGGER = LoggerFactory.getLogger(TestCreateHadoopSequenceFile.class);
         System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", "info");
         System.setProperty("org.slf4j.simpleLogger.log.nifi.processors.hadoop", "debug");
     }
@@ -204,6 +203,147 @@ public class TestCreateHadoopSequenceFile {
 //        fos.close();
     }
 
+    @Test
+    public void testSequenceFileBzipCompressionCodec() throws UnsupportedEncodingException, IOException {
+
+        controller.setProperty(AbstractHadoopProcessor.COMPRESSION_CODEC, AbstractHadoopProcessor.CompressionType.BZIP.name());
+        controller.setProperty(CreateHadoopSequenceFile.COMPRESSION_TYPE, SequenceFile.CompressionType.BLOCK.name());
+
+        File inFile = inFiles[0];
+        try (FileInputStream fin = new FileInputStream(inFile) ){
+            controller.enqueue(fin);
+        }
+        controller.run();
+
+        List<MockFlowFile> successSeqFiles = controller.getFlowFilesForRelationship(CreateHadoopSequenceFile.RELATIONSHIP_SUCCESS);
+        List<MockFlowFile> failedFlowFiles = controller.getFlowFilesForRelationship(CreateHadoopSequenceFile.RELATIONSHIP_FAILURE);
+
+        assertEquals(0, failedFlowFiles.size());
+        assertEquals(1, successSeqFiles.size());
+
+        MockFlowFile ff = successSeqFiles.iterator().next();
+        byte[] data = ff.toByteArray();
+
+
+        final String magicHeader = new String(data, 0, 3, "UTF-8");
+        assertEquals("SEQ", magicHeader);
+        // Format of header is SEQ followed by the version (1 byte).
+        // Then, the length of the Key type (1 byte), then the Key type
+        // Then, the length of the Value type(1 byte), then the Value type
+        final String keyType = Text.class.getCanonicalName();
+        final int valueTypeStart = 3 + 1 + 1 + keyType.length() + 1;
+        final int valueTypeLength = data[5 + keyType.length()];
+        final String valueType = BytesWritable.class.getCanonicalName();
+
+        assertEquals(valueType.length(), valueTypeLength);
+        assertEquals(valueType, new String(data, valueTypeStart, valueType.length(), "UTF-8"));
+
+        final int compressionIndex = 3 + 1 + 1 + keyType.length() + 1 + valueType.length();
+        final int blockCompressionIndex = compressionIndex + 1;
+
+        assertEquals(1, data[compressionIndex]);
+        assertEquals(1, data[blockCompressionIndex]);
+
+        final int codecTypeSize = data[blockCompressionIndex + 1];
+        final int codecTypeStartIndex = blockCompressionIndex + 2;
+
+        assertEquals(BZip2Codec.class.getCanonicalName(), new String(data, codecTypeStartIndex, codecTypeSize, "UTF-8"));
+    }
+
+    @Test
+    public void testSequenceFileDefaultCompressionCodec() throws UnsupportedEncodingException, IOException {
+
+        controller.setProperty(AbstractHadoopProcessor.COMPRESSION_CODEC, AbstractHadoopProcessor.CompressionType.DEFAULT.name());
+        controller.setProperty(CreateHadoopSequenceFile.COMPRESSION_TYPE, SequenceFile.CompressionType.BLOCK.name());
+
+        File inFile = inFiles[0];
+        try (FileInputStream fin = new FileInputStream(inFile) ){
+            controller.enqueue(fin);
+        }
+        controller.run();
+
+        List<MockFlowFile> successSeqFiles = controller.getFlowFilesForRelationship(CreateHadoopSequenceFile.RELATIONSHIP_SUCCESS);
+        List<MockFlowFile> failedFlowFiles = controller.getFlowFilesForRelationship(CreateHadoopSequenceFile.RELATIONSHIP_FAILURE);
+
+        assertEquals(0, failedFlowFiles.size());
+        assertEquals(1, successSeqFiles.size());
+
+        MockFlowFile ff = successSeqFiles.iterator().next();
+        byte[] data = ff.toByteArray();
+
+
+        final String magicHeader = new String(data, 0, 3, "UTF-8");
+        assertEquals("SEQ", magicHeader);
+        // Format of header is SEQ followed by the version (1 byte).
+        // Then, the length of the Key type (1 byte), then the Key type
+        // Then, the length of the Value type(1 byte), then the Value type
+        final String keyType = Text.class.getCanonicalName();
+        final int valueTypeStart = 3 + 1 + 1 + keyType.length() + 1;
+        final int valueTypeLength = data[5 + keyType.length()];
+        final String valueType = BytesWritable.class.getCanonicalName();
+
+        assertEquals(valueType.length(), valueTypeLength);
+        assertEquals(valueType, new String(data, valueTypeStart, valueType.length(), "UTF-8"));
+
+        final int compressionIndex = 3 + 1 + 1 + keyType.length() + 1 + valueType.length();
+        final int blockCompressionIndex = compressionIndex + 1;
+
+        assertEquals(1, data[compressionIndex]);
+        assertEquals(1, data[blockCompressionIndex]);
+
+        final int codecTypeSize = data[blockCompressionIndex + 1];
+        final int codecTypeStartIndex = blockCompressionIndex + 2;
+
+        assertEquals(DefaultCodec.class.getCanonicalName(), new String(data, codecTypeStartIndex, codecTypeSize, "UTF-8"));
+    }
+
+    @Test
+    public void testSequenceFileNoneCompressionCodec() throws UnsupportedEncodingException, IOException {
+
+        controller.setProperty(AbstractHadoopProcessor.COMPRESSION_CODEC, AbstractHadoopProcessor.CompressionType.NONE.name());
+        controller.setProperty(CreateHadoopSequenceFile.COMPRESSION_TYPE, SequenceFile.CompressionType.BLOCK.name());
+
+        File inFile = inFiles[0];
+        try (FileInputStream fin = new FileInputStream(inFile) ){
+            controller.enqueue(fin);
+        }
+        controller.run();
+
+        List<MockFlowFile> successSeqFiles = controller.getFlowFilesForRelationship(CreateHadoopSequenceFile.RELATIONSHIP_SUCCESS);
+        List<MockFlowFile> failedFlowFiles = controller.getFlowFilesForRelationship(CreateHadoopSequenceFile.RELATIONSHIP_FAILURE);
+
+        assertEquals(0, failedFlowFiles.size());
+        assertEquals(1, successSeqFiles.size());
+
+        MockFlowFile ff = successSeqFiles.iterator().next();
+        byte[] data = ff.toByteArray();
+
+
+        final String magicHeader = new String(data, 0, 3, "UTF-8");
+        assertEquals("SEQ", magicHeader);
+        // Format of header is SEQ followed by the version (1 byte).
+        // Then, the length of the Key type (1 byte), then the Key type
+        // Then, the length of the Value type(1 byte), then the Value type
+        final String keyType = Text.class.getCanonicalName();
+        final int valueTypeStart = 3 + 1 + 1 + keyType.length() + 1;
+        final int valueTypeLength = data[5 + keyType.length()];
+        final String valueType = BytesWritable.class.getCanonicalName();
+
+        assertEquals(valueType.length(), valueTypeLength);
+        assertEquals(valueType, new String(data, valueTypeStart, valueType.length(), "UTF-8"));
+
+        final int compressionIndex = 3 + 1 + 1 + keyType.length() + 1 + valueType.length();
+        final int blockCompressionIndex = compressionIndex + 1;
+
+        assertEquals(1, data[compressionIndex]);
+        assertEquals(1, data[blockCompressionIndex]);
+
+        final int codecTypeSize = data[blockCompressionIndex + 1];
+        final int codecTypeStartIndex = blockCompressionIndex + 2;
+
+        assertEquals(DefaultCodec.class.getCanonicalName(), new String(data, codecTypeStartIndex, codecTypeSize, "UTF-8"));
+    }
+
     private static class TestableCreateHadoopSequenceFile extends CreateHadoopSequenceFile {
 
         private KerberosProperties testKerbersProperties;
@@ -217,4 +357,5 @@ public class TestCreateHadoopSequenceFile {
             return testKerbersProperties;
         }
     }
+
 }