You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by oz...@apache.org on 2016/11/02 20:15:54 UTC

nifi git commit: NIFI-2963 FetchHDFS should support Compression Codec property

Repository: nifi
Updated Branches:
  refs/heads/master b5550ffcf -> 1abd017c3


NIFI-2963 FetchHDFS should support Compression Codec property

This closes #1166


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/1abd017c
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/1abd017c
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/1abd017c

Branch: refs/heads/master
Commit: 1abd017c353d2b45319e3a9e03281bba170c7d55
Parents: b5550ff
Author: Pierre Villard <pi...@gmail.com>
Authored: Fri Oct 28 12:35:37 2016 +0200
Committer: Oleg Zhurakousky <ol...@suitcase.io>
Committed: Wed Nov 2 16:15:09 2016 -0400

----------------------------------------------------------------------
 .../nifi/processors/hadoop/FetchHDFS.java       | 43 +++++++++++++-
 .../nifi/processors/hadoop/TestFetchHDFS.java   | 60 ++++++++++++++++++++
 2 files changed, 100 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/1abd017c/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java
index 3b1cce2..cd5f76c 100644
--- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java
+++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java
@@ -16,9 +16,13 @@
  */
 package org.apache.nifi.processors.hadoop;
 
-import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
@@ -29,6 +33,7 @@ import org.apache.nifi.annotation.documentation.SeeAlso;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.Relationship;
@@ -38,6 +43,7 @@ import org.apache.nifi.util.StopWatch;
 
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.io.InputStream;
 import java.net.URI;
 import java.util.ArrayList;
 import java.util.HashSet;
@@ -83,6 +89,7 @@ public class FetchHDFS extends AbstractHadoopProcessor {
     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
         final List<PropertyDescriptor> props = new ArrayList<>(properties);
         props.add(FILENAME);
+        props.add(COMPRESSION_CODEC);
         return props;
     }
 
@@ -116,10 +123,38 @@ public class FetchHDFS extends AbstractHadoopProcessor {
             return;
         }
 
+        InputStream stream = null;
+        CompressionCodec codec = null;
+        Configuration conf = getConfiguration();
+        final CompressionCodecFactory compressionCodecFactory = new CompressionCodecFactory(conf);
+        final CompressionType compressionType = CompressionType.valueOf(context.getProperty(COMPRESSION_CODEC).toString());
+        final boolean inferCompressionCodec = compressionType == CompressionType.AUTOMATIC;
+
+        if(inferCompressionCodec) {
+            codec = compressionCodecFactory.getCodec(path);
+        } else if (compressionType != CompressionType.NONE) {
+            codec = getCompressionCodec(context, getConfiguration());
+        }
+
         final URI uri = path.toUri();
         final StopWatch stopWatch = new StopWatch(true);
-        try (final FSDataInputStream inStream = hdfs.open(path, 16384)) {
-            flowFile = session.importFrom(inStream, flowFile);
+        try {
+
+            final String outputFilename;
+            final String originalFilename = path.getName();
+            stream = hdfs.open(path, 16384);
+
+            // Check if compression codec is defined (inferred or otherwise)
+            if (codec != null) {
+                stream = codec.createInputStream(stream);
+                outputFilename = StringUtils.removeEnd(originalFilename, codec.getDefaultExtension());
+            } else {
+                outputFilename = originalFilename;
+            }
+
+            flowFile = session.importFrom(stream, flowFile);
+            flowFile = session.putAttribute(flowFile, CoreAttributes.FILENAME.key(), outputFilename);
+
             stopWatch.stop();
             getLogger().info("Successfully received content from {} for {} in {}", new Object[] {uri, flowFile, stopWatch.getDuration()});
             session.getProvenanceReporter().fetch(flowFile, uri.toString(), stopWatch.getDuration(TimeUnit.MILLISECONDS));
@@ -133,6 +168,8 @@ public class FetchHDFS extends AbstractHadoopProcessor {
             getLogger().error("Failed to retrieve content from {} for {} due to {}; routing to comms.failure", new Object[] {uri, flowFile, e});
             flowFile = session.penalize(flowFile);
             session.transfer(flowFile, REL_COMMS_FAILURE);
+        } finally {
+            IOUtils.closeQuietly(stream);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/1abd017c/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestFetchHDFS.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestFetchHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestFetchHDFS.java
index 6153980..8b8b568 100644
--- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestFetchHDFS.java
+++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestFetchHDFS.java
@@ -16,7 +16,9 @@
  */
 package org.apache.nifi.processors.hadoop;
 
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
 import org.apache.nifi.hadoop.KerberosProperties;
+import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.NiFiProperties;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
@@ -25,9 +27,13 @@ import org.junit.Test;
 
 import java.io.File;
 import java.io.IOException;
+import java.io.InputStream;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -104,6 +110,60 @@ public class TestFetchHDFS {
         runner.assertAllFlowFilesTransferred(FetchHDFS.REL_FAILURE, 1);
     }
 
+    @Test
+    public void testAutomaticDecompression() throws IOException {
+        FetchHDFS proc = new TestableFetchHDFS(kerberosProperties);
+        TestRunner runner = TestRunners.newTestRunner(proc);
+        runner.setProperty(FetchHDFS.FILENAME, "src/test/resources/testdata/randombytes-1.gz");
+        runner.setProperty(FetchHDFS.COMPRESSION_CODEC, "AUTOMATIC");
+        runner.enqueue(new String("trigger flow file"));
+        runner.run();
+
+        List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(FetchHDFS.REL_SUCCESS);
+        assertEquals(1, flowFiles.size());
+
+        MockFlowFile flowFile = flowFiles.get(0);
+        assertTrue(flowFile.getAttribute(CoreAttributes.FILENAME.key()).equals("randombytes-1"));
+        InputStream expected = getClass().getResourceAsStream("/testdata/randombytes-1");
+        flowFile.assertContentEquals(expected);
+    }
+
+    @Test
+    public void testInferCompressionCodecDisabled() throws IOException {
+        FetchHDFS proc = new TestableFetchHDFS(kerberosProperties);
+        TestRunner runner = TestRunners.newTestRunner(proc);
+        runner.setProperty(FetchHDFS.FILENAME, "src/test/resources/testdata/randombytes-1.gz");
+        runner.setProperty(FetchHDFS.COMPRESSION_CODEC, "NONE");
+        runner.enqueue(new String("trigger flow file"));
+        runner.run();
+
+        List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(FetchHDFS.REL_SUCCESS);
+        assertEquals(1, flowFiles.size());
+
+        MockFlowFile flowFile = flowFiles.get(0);
+        assertTrue(flowFile.getAttribute(CoreAttributes.FILENAME.key()).equals("randombytes-1.gz"));
+        InputStream expected = getClass().getResourceAsStream("/testdata/randombytes-1.gz");
+        flowFile.assertContentEquals(expected);
+    }
+
+    @Test
+    public void testFileExtensionNotACompressionCodec() throws IOException {
+        FetchHDFS proc = new TestableFetchHDFS(kerberosProperties);
+        TestRunner runner = TestRunners.newTestRunner(proc);
+        runner.setProperty(FetchHDFS.FILENAME, "src/test/resources/testdata/13545423550275052.zip");
+        runner.setProperty(FetchHDFS.COMPRESSION_CODEC, "AUTOMATIC");
+        runner.enqueue(new String("trigger flow file"));
+        runner.run();
+
+        List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(FetchHDFS.REL_SUCCESS);
+        assertEquals(1, flowFiles.size());
+
+        MockFlowFile flowFile = flowFiles.get(0);
+        assertTrue(flowFile.getAttribute(CoreAttributes.FILENAME.key()).equals("13545423550275052.zip"));
+        InputStream expected = getClass().getResourceAsStream("/testdata/13545423550275052.zip");
+        flowFile.assertContentEquals(expected);
+    }
+
     private static class TestableFetchHDFS extends FetchHDFS {
         private final KerberosProperties testKerberosProps;