You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by oz...@apache.org on 2016/11/02 20:15:54 UTC
nifi git commit: NIFI-2963 FetchHDFS should support Compression Codec
property
Repository: nifi
Updated Branches:
refs/heads/master b5550ffcf -> 1abd017c3
NIFI-2963 FetchHDFS should support Compression Codec property
This closes #1166
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/1abd017c
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/1abd017c
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/1abd017c
Branch: refs/heads/master
Commit: 1abd017c353d2b45319e3a9e03281bba170c7d55
Parents: b5550ff
Author: Pierre Villard <pi...@gmail.com>
Authored: Fri Oct 28 12:35:37 2016 +0200
Committer: Oleg Zhurakousky <ol...@suitcase.io>
Committed: Wed Nov 2 16:15:09 2016 -0400
----------------------------------------------------------------------
.../nifi/processors/hadoop/FetchHDFS.java | 43 +++++++++++++-
.../nifi/processors/hadoop/TestFetchHDFS.java | 60 ++++++++++++++++++++
2 files changed, 100 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/1abd017c/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java
index 3b1cce2..cd5f76c 100644
--- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java
+++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java
@@ -16,9 +16,13 @@
*/
package org.apache.nifi.processors.hadoop;
-import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.security.AccessControlException;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
@@ -29,6 +33,7 @@ import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
@@ -38,6 +43,7 @@ import org.apache.nifi.util.StopWatch;
import java.io.FileNotFoundException;
import java.io.IOException;
+import java.io.InputStream;
import java.net.URI;
import java.util.ArrayList;
import java.util.HashSet;
@@ -83,6 +89,7 @@ public class FetchHDFS extends AbstractHadoopProcessor {
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final List<PropertyDescriptor> props = new ArrayList<>(properties);
props.add(FILENAME);
+ props.add(COMPRESSION_CODEC);
return props;
}
@@ -116,10 +123,38 @@ public class FetchHDFS extends AbstractHadoopProcessor {
return;
}
+ InputStream stream = null;
+ CompressionCodec codec = null;
+ Configuration conf = getConfiguration();
+ final CompressionCodecFactory compressionCodecFactory = new CompressionCodecFactory(conf);
+ final CompressionType compressionType = CompressionType.valueOf(context.getProperty(COMPRESSION_CODEC).toString());
+ final boolean inferCompressionCodec = compressionType == CompressionType.AUTOMATIC;
+
+ if(inferCompressionCodec) {
+ codec = compressionCodecFactory.getCodec(path);
+ } else if (compressionType != CompressionType.NONE) {
+ codec = getCompressionCodec(context, getConfiguration());
+ }
+
final URI uri = path.toUri();
final StopWatch stopWatch = new StopWatch(true);
- try (final FSDataInputStream inStream = hdfs.open(path, 16384)) {
- flowFile = session.importFrom(inStream, flowFile);
+ try {
+
+ final String outputFilename;
+ final String originalFilename = path.getName();
+ stream = hdfs.open(path, 16384);
+
+ // Check if compression codec is defined (inferred or otherwise)
+ if (codec != null) {
+ stream = codec.createInputStream(stream);
+ outputFilename = StringUtils.removeEnd(originalFilename, codec.getDefaultExtension());
+ } else {
+ outputFilename = originalFilename;
+ }
+
+ flowFile = session.importFrom(stream, flowFile);
+ flowFile = session.putAttribute(flowFile, CoreAttributes.FILENAME.key(), outputFilename);
+
stopWatch.stop();
getLogger().info("Successfully received content from {} for {} in {}", new Object[] {uri, flowFile, stopWatch.getDuration()});
session.getProvenanceReporter().fetch(flowFile, uri.toString(), stopWatch.getDuration(TimeUnit.MILLISECONDS));
@@ -133,6 +168,8 @@ public class FetchHDFS extends AbstractHadoopProcessor {
getLogger().error("Failed to retrieve content from {} for {} due to {}; routing to comms.failure", new Object[] {uri, flowFile, e});
flowFile = session.penalize(flowFile);
session.transfer(flowFile, REL_COMMS_FAILURE);
+ } finally {
+ IOUtils.closeQuietly(stream);
}
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/1abd017c/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestFetchHDFS.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestFetchHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestFetchHDFS.java
index 6153980..8b8b568 100644
--- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestFetchHDFS.java
+++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestFetchHDFS.java
@@ -16,7 +16,9 @@
*/
package org.apache.nifi.processors.hadoop;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.hadoop.KerberosProperties;
+import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
@@ -25,9 +27,13 @@ import org.junit.Test;
import java.io.File;
import java.io.IOException;
+import java.io.InputStream;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -104,6 +110,60 @@ public class TestFetchHDFS {
runner.assertAllFlowFilesTransferred(FetchHDFS.REL_FAILURE, 1);
}
+ @Test
+ public void testAutomaticDecompression() throws IOException {
+ FetchHDFS proc = new TestableFetchHDFS(kerberosProperties);
+ TestRunner runner = TestRunners.newTestRunner(proc);
+ runner.setProperty(FetchHDFS.FILENAME, "src/test/resources/testdata/randombytes-1.gz");
+ runner.setProperty(FetchHDFS.COMPRESSION_CODEC, "AUTOMATIC");
+ runner.enqueue(new String("trigger flow file"));
+ runner.run();
+
+ List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(FetchHDFS.REL_SUCCESS);
+ assertEquals(1, flowFiles.size());
+
+ MockFlowFile flowFile = flowFiles.get(0);
+ assertTrue(flowFile.getAttribute(CoreAttributes.FILENAME.key()).equals("randombytes-1"));
+ InputStream expected = getClass().getResourceAsStream("/testdata/randombytes-1");
+ flowFile.assertContentEquals(expected);
+ }
+
+ @Test
+ public void testInferCompressionCodecDisabled() throws IOException {
+ FetchHDFS proc = new TestableFetchHDFS(kerberosProperties);
+ TestRunner runner = TestRunners.newTestRunner(proc);
+ runner.setProperty(FetchHDFS.FILENAME, "src/test/resources/testdata/randombytes-1.gz");
+ runner.setProperty(FetchHDFS.COMPRESSION_CODEC, "NONE");
+ runner.enqueue(new String("trigger flow file"));
+ runner.run();
+
+ List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(FetchHDFS.REL_SUCCESS);
+ assertEquals(1, flowFiles.size());
+
+ MockFlowFile flowFile = flowFiles.get(0);
+ assertTrue(flowFile.getAttribute(CoreAttributes.FILENAME.key()).equals("randombytes-1.gz"));
+ InputStream expected = getClass().getResourceAsStream("/testdata/randombytes-1.gz");
+ flowFile.assertContentEquals(expected);
+ }
+
+ @Test
+ public void testFileExtensionNotACompressionCodec() throws IOException {
+ FetchHDFS proc = new TestableFetchHDFS(kerberosProperties);
+ TestRunner runner = TestRunners.newTestRunner(proc);
+ runner.setProperty(FetchHDFS.FILENAME, "src/test/resources/testdata/13545423550275052.zip");
+ runner.setProperty(FetchHDFS.COMPRESSION_CODEC, "AUTOMATIC");
+ runner.enqueue(new String("trigger flow file"));
+ runner.run();
+
+ List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(FetchHDFS.REL_SUCCESS);
+ assertEquals(1, flowFiles.size());
+
+ MockFlowFile flowFile = flowFiles.get(0);
+ assertTrue(flowFile.getAttribute(CoreAttributes.FILENAME.key()).equals("13545423550275052.zip"));
+ InputStream expected = getClass().getResourceAsStream("/testdata/13545423550275052.zip");
+ flowFile.assertContentEquals(expected);
+ }
+
private static class TestableFetchHDFS extends FetchHDFS {
private final KerberosProperties testKerberosProps;