You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2015/04/28 16:53:36 UTC

[4/7] incubator-nifi git commit: NIFI-533: Refactored for Unit Tests and added unit tests for ListHDFS

NIFI-533: Refactored for Unit Tests and added unit tests for ListHDFS


Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/e4f43156
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/e4f43156
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/e4f43156

Branch: refs/heads/ListHDFS
Commit: e4f431561e6af7a603a5c8b7a82910f28dc6d600
Parents: dc7f7a8
Author: Mark Payne <ma...@hotmail.com>
Authored: Tue Apr 28 08:46:38 2015 -0400
Committer: Mark Payne <ma...@hotmail.com>
Committed: Tue Apr 28 08:46:38 2015 -0400

----------------------------------------------------------------------
 .../hadoop/AbstractHadoopProcessor.java         |  24 +-
 .../hadoop/CreateHadoopSequenceFile.java        |   2 +-
 .../nifi/processors/hadoop/FetchHDFS.java       |   2 +-
 .../apache/nifi/processors/hadoop/GetHDFS.java  |  10 +-
 .../processors/hadoop/GetHDFSSequenceFile.java  |  12 +-
 .../apache/nifi/processors/hadoop/ListHDFS.java |  42 ++-
 .../apache/nifi/processors/hadoop/PutHDFS.java  |  21 +-
 .../nifi/processors/hadoop/TestListHDFS.java    | 347 +++++++++++++++++++
 8 files changed, 416 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e4f43156/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java
index 91c21a0..355950f 100644
--- a/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java
+++ b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java
@@ -70,7 +70,7 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor {
 
     // variables shared by all threads of this processor
     // Hadoop Configuration and FileSystem
-    protected final AtomicReference<Tuple<Configuration, FileSystem>> hdfsResources = new AtomicReference<>();
+    private final AtomicReference<Tuple<Configuration, FileSystem>> hdfsResources = new AtomicReference<>();
 
     @Override
     protected void init(ProcessorInitializationContext context) {
@@ -153,7 +153,7 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor {
             String disableCacheName = String.format("fs.%s.impl.disable.cache", FileSystem.getDefaultUri(config).getScheme());
             config.set(disableCacheName, "true");
 
-            final FileSystem fs = FileSystem.get(config);
+            final FileSystem fs = getFileSystem(config);
             getLogger().info(
                     "Initialized a new HDFS File System with working dir: {} default block size: {} default replication: {} config: {}",
                     new Object[]{fs.getWorkingDirectory(), fs.getDefaultBlockSize(new Path(dir)),
@@ -165,6 +165,18 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor {
         }
     }
 
+    /**
+     * This exists in order to allow unit tests to override it so that they don't take several minutes waiting
+     * for UDP packets to be received
+     *
+     * @param config the configuration to use
+     * @return the FileSystem that is created for the given Configuration
+     * @throws IOException if unable to create the FileSystem
+     */
+    protected FileSystem getFileSystem(final Configuration config) throws IOException {
+        return FileSystem.get(config);
+    }
+
     /*
      * Drastically reduce the timeout of a socket connection from the default in FileSystem.get()
      */
@@ -243,4 +255,12 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor {
         }
         return builder.toString();
     }
+
+    protected Configuration getConfiguration() {
+        return hdfsResources.get().getKey();
+    }
+
+    protected FileSystem getFileSystem() {
+        return hdfsResources.get().getValue();
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e4f43156/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/CreateHadoopSequenceFile.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/CreateHadoopSequenceFile.java b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/CreateHadoopSequenceFile.java
index f462277..186a290 100644
--- a/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/CreateHadoopSequenceFile.java
+++ b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/CreateHadoopSequenceFile.java
@@ -156,7 +156,7 @@ public class CreateHadoopSequenceFile extends AbstractHadoopProcessor {
         final String fileName = flowFile.getAttribute(CoreAttributes.FILENAME.key()) + ".sf";
         flowFile = session.putAttribute(flowFile, CoreAttributes.FILENAME.key(), fileName);
         try {
-            flowFile = sequenceFileWriter.writeSequenceFile(flowFile, session, hdfsResources.get().getKey(), compressionType);
+            flowFile = sequenceFileWriter.writeSequenceFile(flowFile, session, getConfiguration(), compressionType);
             session.transfer(flowFile, RELATIONSHIP_SUCCESS);
             getLogger().info("Transferred flowfile {} to {}", new Object[]{flowFile, RELATIONSHIP_SUCCESS});
         } catch (ProcessException e) {

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e4f43156/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java
index b5efce0..4a52fb7 100644
--- a/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java
+++ b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java
@@ -99,7 +99,7 @@ public class FetchHDFS extends AbstractHadoopProcessor {
             return;
         }
 
-        final FileSystem hdfs = hdfsResources.get().getValue();
+        final FileSystem hdfs = getFileSystem();
         final Path path = new Path(context.getProperty(FILENAME).evaluateAttributeExpressions(flowFile).getValue());
         final URI uri = path.toUri();
 

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e4f43156/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java
index f7894d9..7aa534f 100644
--- a/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java
+++ b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java
@@ -236,8 +236,8 @@ public class GetHDFS extends AbstractHadoopProcessor {
         abstractOnScheduled(context);
         // copy configuration values to pass them around cleanly
         processorConfig = new ProcessorConfiguration(context);
-        FileSystem fs = hdfsResources.get().getValue();
-        Path dir = new Path(context.getProperty(DIRECTORY).getValue());
+        final FileSystem fs = getFileSystem();
+        final Path dir = new Path(context.getProperty(DIRECTORY).getValue());
         if (!fs.exists(dir)) {
             throw new IOException("PropertyDescriptor " + DIRECTORY + " has invalid value " + dir + ". The directory does not exist.");
         }
@@ -330,8 +330,8 @@ public class GetHDFS extends AbstractHadoopProcessor {
     protected void processBatchOfFiles(final List<Path> files, final ProcessContext context, final ProcessSession session) {
         // process the batch of files
         FSDataInputStream stream = null;
-        Configuration conf = hdfsResources.get().getKey();
-        FileSystem hdfs = hdfsResources.get().getValue();
+        Configuration conf = getConfiguration();
+        FileSystem hdfs = getFileSystem();
         final boolean keepSourceFiles = context.getProperty(KEEP_SOURCE_FILE).asBoolean();
         final Double bufferSizeProp = context.getProperty(BUFFER_SIZE).asDataSize(DataUnit.B);
         int bufferSize = bufferSizeProp != null ? bufferSizeProp.intValue() : conf.getInt(BUFFER_SIZE_KEY,
@@ -398,7 +398,7 @@ public class GetHDFS extends AbstractHadoopProcessor {
 
         if (System.currentTimeMillis() >= nextPollTime && listingLock.tryLock()) {
             try {
-                final FileSystem hdfs = hdfsResources.get().getValue();
+                final FileSystem hdfs = getFileSystem();
                 // get listing
                 listing = selectFiles(hdfs, processorConfig.getConfiguredRootDirPath(), null);
                 lastPollTime.set(System.currentTimeMillis());

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e4f43156/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFSSequenceFile.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFSSequenceFile.java b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFSSequenceFile.java
index 22ba36b..f032ee4 100644
--- a/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFSSequenceFile.java
+++ b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFSSequenceFile.java
@@ -22,6 +22,9 @@ import java.util.List;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.SeeAlso;
@@ -34,10 +37,6 @@ import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processors.hadoop.util.SequenceFileReader;
 import org.apache.nifi.util.StopWatch;
-import org.apache.nifi.util.Tuple;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 
 /**
  * This processor is used to pull files from HDFS. The files being pulled in MUST be SequenceFile formatted files. The processor creates a flow file for each key/value entry in the ingested
@@ -80,9 +79,8 @@ public class GetHDFSSequenceFile extends GetHDFS {
 
     @Override
     protected void processBatchOfFiles(final List<Path> files, final ProcessContext context, final ProcessSession session) {
-        final Tuple<Configuration, FileSystem> hadoopResources = hdfsResources.get();
-        final Configuration conf = hadoopResources.getKey();
-        final FileSystem hdfs = hadoopResources.getValue();
+        final Configuration conf = getConfiguration();
+        final FileSystem hdfs = getFileSystem();
         final String flowFileContentValue = context.getProperty(FLOWFILE_CONTENT).getValue();
         final boolean keepSourceFiles = context.getProperty(KEEP_SOURCE_FILE).asBoolean();
         final Double bufferSizeProp = context.getProperty(BUFFER_SIZE).asDataSize(DataUnit.B);

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e4f43156/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java
index 56a128a..151cbf2 100644
--- a/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java
+++ b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java
@@ -118,12 +118,14 @@ public class ListHDFS extends AbstractHadoopProcessor {
     private volatile Long lastListingTime = null;
     private volatile Set<Path> latestPathsListed = new HashSet<>();
     private volatile boolean electedPrimaryNode = false;
-    private File persistenceFile = null;
 
     @Override
     protected void init(final ProcessorInitializationContext context) {
         super.init(context);
-        persistenceFile = new File("conf/state/" + getIdentifier());
+    }
+
+    protected File getPersistenceFile() {
+        return new File("conf/state/" + getIdentifier());
     }
 
     @Override
@@ -143,7 +145,7 @@ public class ListHDFS extends AbstractHadoopProcessor {
         return relationships;
     }
 
-    private String getKey(final String directory) {
+    protected String getKey(final String directory) {
         return getIdentifier() + ".lastListingTime." + directory;
     }
 
@@ -169,18 +171,13 @@ public class ListHDFS extends AbstractHadoopProcessor {
     }
 
 
-    @Override
-    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
-        final String directory = context.getProperty(DIRECTORY).getValue();
-
+    private Long getMinTimestamp(final String directory, final DistributedMapCacheClient client) throws IOException {
         // Determine the timestamp for the last file that we've listed.
         Long minTimestamp = lastListingTime;
         if ( minTimestamp == null || electedPrimaryNode ) {
             // We haven't yet restored any state from local or distributed state - or it's been at least a minute since
             // we have performed a listing. In this case,
             // First, attempt to get timestamp from distributed cache service.
-            final DistributedMapCacheClient client = context.getProperty(DISTRIBUTED_CACHE_SERVICE).asControllerService(DistributedMapCacheClient.class);
-
             try {
                 final StringSerDe serde = new StringSerDe();
                 final String serializedState = client.get(getKey(directory), serde, serde);
@@ -197,14 +194,13 @@ public class ListHDFS extends AbstractHadoopProcessor {
                 this.lastListingTime = minTimestamp;
                 electedPrimaryNode = false; // no requirement to pull an update from the distributed cache anymore.
             } catch (final IOException ioe) {
-                getLogger().error("Failed to retrieve timestamp of last listing from Distributed Cache Service. Will not perform listing until this is accomplished.");
-                context.yield();
-                return;
+                throw ioe;
             }
 
             // Check the persistence file. We want to use the latest timestamp that we have so that
             // we don't duplicate data.
             try {
+                final File persistenceFile = getPersistenceFile();
                 if ( persistenceFile.exists() ) {
                     try (final FileInputStream fis = new FileInputStream(persistenceFile)) {
                         final Properties props = new Properties();
@@ -240,9 +236,25 @@ public class ListHDFS extends AbstractHadoopProcessor {
             }
         }
 
+        return minTimestamp;
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+        final String directory = context.getProperty(DIRECTORY).getValue();
+        final DistributedMapCacheClient client = context.getProperty(DISTRIBUTED_CACHE_SERVICE).asControllerService(DistributedMapCacheClient.class);
+
+        final Long minTimestamp;
+        try {
+            minTimestamp = getMinTimestamp(directory, client);
+        } catch (final IOException ioe) {
+            getLogger().error("Failed to retrieve timestamp of last listing from Distributed Cache Service. Will not perform listing until this is accomplished.");
+            context.yield();
+            return;
+        }
 
         // Pull in any file that is newer than the timestamp that we have.
-        final FileSystem hdfs = hdfsResources.get().getValue();
+        final FileSystem hdfs = getFileSystem();
         final boolean recursive = context.getProperty(RECURSE_SUBDIRS).asBoolean();
         final Path rootPath = new Path(directory);
 
@@ -311,7 +323,6 @@ public class ListHDFS extends AbstractHadoopProcessor {
                 }
 
                 // Attempt to save state to remote server.
-                final DistributedMapCacheClient client = context.getProperty(DISTRIBUTED_CACHE_SERVICE).asControllerService(DistributedMapCacheClient.class);
                 try {
                     client.put(getKey(directory), serializedState, new StringSerDe(), new StringSerDe());
                 } catch (final IOException ioe) {
@@ -397,11 +408,12 @@ public class ListHDFS extends AbstractHadoopProcessor {
         }
     }
 
-    private void persistLocalState(final String directory, final String serializedState) throws IOException {
+    protected void persistLocalState(final String directory, final String serializedState) throws IOException {
         // we need to keep track of all files that we pulled in that had a modification time equal to
         // lastListingTime so that we can avoid pulling those files in again. We can't just ignore any files
         // that have a mod time equal to that timestamp because more files may come in with the same timestamp
         // later in the same millisecond.
+        final File persistenceFile = getPersistenceFile();
         final File dir = persistenceFile.getParentFile();
         if ( !dir.exists() && !dir.mkdirs() ) {
             throw new IOException("Could not create directory " + dir.getAbsolutePath() + " in order to save local state");

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e4f43156/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
index 057f786..52cf475 100644
--- a/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
+++ b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
@@ -32,10 +32,10 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.ipc.RemoteException;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.SeeAlso;
 import org.apache.nifi.annotation.documentation.Tags;
-import org.apache.nifi.annotation.behavior.WritesAttribute;
 import org.apache.nifi.annotation.lifecycle.OnScheduled;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.PropertyValue;
@@ -54,7 +54,6 @@ import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.stream.io.BufferedInputStream;
 import org.apache.nifi.stream.io.StreamUtils;
 import org.apache.nifi.util.StopWatch;
-import org.apache.nifi.util.Tuple;
 
 /**
  * This processor copies FlowFiles to HDFS.
@@ -183,8 +182,7 @@ public class PutHDFS extends AbstractHadoopProcessor {
         } else {
             dfsUmask = FsPermission.DEFAULT_UMASK;
         }
-        final Tuple<Configuration, FileSystem> resources = hdfsResources.get();
-        final Configuration conf = resources.getKey();
+        final Configuration conf = getConfiguration();
         FsPermission.setUMask(conf, new FsPermission(dfsUmask));
     }
 
@@ -195,26 +193,23 @@ public class PutHDFS extends AbstractHadoopProcessor {
             return;
         }
 
-        final Tuple<Configuration, FileSystem> resources = hdfsResources.get();
-        if (resources == null || resources.getKey() == null || resources.getValue() == null) {
+        final Configuration configuration = getConfiguration();
+        final FileSystem hdfs = getFileSystem();
+        if (configuration == null || hdfs == null) {
             getLogger().error("HDFS not configured properly");
             session.transfer(flowFile, REL_FAILURE);
             context.yield();
             return;
         }
-        final Configuration conf = resources.getKey();
-        final FileSystem hdfs = resources.getValue();
 
-        final Path configuredRootDirPath = new Path(context.getProperty(DIRECTORY).evaluateAttributeExpressions(flowFile)
-                .getValue());
+        final Path configuredRootDirPath = new Path(context.getProperty(DIRECTORY).evaluateAttributeExpressions(flowFile).getValue());
         final String conflictResponse = context.getProperty(CONFLICT_RESOLUTION).getValue();
 
         final Double blockSizeProp = context.getProperty(BLOCK_SIZE).asDataSize(DataUnit.B);
         final long blockSize = blockSizeProp != null ? blockSizeProp.longValue() : hdfs.getDefaultBlockSize(configuredRootDirPath);
 
         final Double bufferSizeProp = context.getProperty(BUFFER_SIZE).asDataSize(DataUnit.B);
-        final int bufferSize = bufferSizeProp != null ? bufferSizeProp.intValue() : conf.getInt(BUFFER_SIZE_KEY,
-                BUFFER_SIZE_DEFAULT);
+        final int bufferSize = bufferSizeProp != null ? bufferSizeProp.intValue() : configuration.getInt(BUFFER_SIZE_KEY, BUFFER_SIZE_DEFAULT);
 
         final Integer replicationProp = context.getProperty(REPLICATION_FACTOR).asInteger();
         final short replication = replicationProp != null ? replicationProp.shortValue() : hdfs
@@ -230,7 +225,7 @@ public class PutHDFS extends AbstractHadoopProcessor {
 
             // Create destination directory if it does not exist
             try {
-                if (!hdfs.getFileStatus(configuredRootDirPath).isDir()) {
+                if (!hdfs.getFileStatus(configuredRootDirPath).isDirectory()) {
                     throw new IOException(configuredRootDirPath.toString() + " already exists and is not a directory");
                 }
             } catch (FileNotFoundException fe) {

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e4f43156/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestListHDFS.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestListHDFS.java b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestListHDFS.java
new file mode 100644
index 0000000..499fd51
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestListHDFS.java
@@ -0,0 +1,347 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.hadoop;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.URI;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.util.Progressable;
+import org.apache.nifi.annotation.notification.PrimaryNodeState;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.distributed.cache.client.Deserializer;
+import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
+import org.apache.nifi.distributed.cache.client.Serializer;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestListHDFS {
+
+    private TestRunner runner;
+    private ListHDFSWithMockedFileSystem proc;
+    private MockCacheClient service;
+
+    @Before
+    public void setup() throws InitializationException {
+        proc = new ListHDFSWithMockedFileSystem();
+        runner = TestRunners.newTestRunner(proc);
+
+        service = new MockCacheClient();
+        runner.addControllerService("service", service);
+        runner.enableControllerService(service);
+
+        runner.setProperty(ListHDFS.HADOOP_CONFIGURATION_RESOURCES, "src/test/resources/core-site.xml");
+        runner.setProperty(ListHDFS.DIRECTORY, "/test");
+        runner.setProperty(ListHDFS.DISTRIBUTED_CACHE_SERVICE, "service");
+    }
+
+    @Test
+    public void testListingHasCorrectAttributes() {
+        proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testFile.txt")));
+
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 1);
+        final MockFlowFile mff = runner.getFlowFilesForRelationship(ListHDFS.REL_SUCCESS).get(0);
+        mff.assertAttributeEquals("path", "/test");
+        mff.assertAttributeEquals("filename", "testFile.txt");
+    }
+
+
+    @Test
+    public void testRecursive() {
+        proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testFile.txt")));
+
+        proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, true, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir")));
+        proc.fileSystem.addFileStatus(new Path("/test/testDir"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir/1.txt")));
+
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 2);
+
+        final MockFlowFile mff1 = runner.getFlowFilesForRelationship(ListHDFS.REL_SUCCESS).get(0);
+        mff1.assertAttributeEquals("path", "/test");
+        mff1.assertAttributeEquals("filename", "testFile.txt");
+
+        final MockFlowFile mff2 = runner.getFlowFilesForRelationship(ListHDFS.REL_SUCCESS).get(1);
+        mff2.assertAttributeEquals("path", "/test/testDir");
+        mff2.assertAttributeEquals("filename", "1.txt");
+    }
+
+    @Test
+    public void testNotRecursive() {
+        runner.setProperty(ListHDFS.RECURSE_SUBDIRS, "false");
+        proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testFile.txt")));
+
+        proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, true, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir")));
+        proc.fileSystem.addFileStatus(new Path("/test/testDir"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir/1.txt")));
+
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 1);
+
+        final MockFlowFile mff1 = runner.getFlowFilesForRelationship(ListHDFS.REL_SUCCESS).get(0);
+        mff1.assertAttributeEquals("path", "/test");
+        mff1.assertAttributeEquals("filename", "testFile.txt");
+    }
+
+
+    @Test
+    public void testNoListUntilUpdateFromRemoteOnPrimaryNodeChange() {
+        proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testFile.txt")));
+
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 1);
+
+        final MockFlowFile mff1 = runner.getFlowFilesForRelationship(ListHDFS.REL_SUCCESS).get(0);
+        mff1.assertAttributeEquals("path", "/test");
+        mff1.assertAttributeEquals("filename", "testFile.txt");
+
+        runner.clearTransferState();
+
+        // add new file to pull
+        proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testFile2.txt")));
+
+        // trigger primary node change
+        proc.onPrimaryNodeChange(PrimaryNodeState.ELECTED_PRIMARY_NODE);
+
+        // cause calls to service to fail
+        service.failOnCalls = true;
+
+        runner.run();
+        runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 0);
+
+        runner.run();
+        runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 0);
+
+        final String key = proc.getKey("/test");
+
+        // wait just to a bit to ensure that the timestamp changes when we update the service
+        final Object curVal = service.values.get(key);
+        try {
+            Thread.sleep(10L);
+        } catch (final InterruptedException ie) {
+        }
+
+        service.failOnCalls = false;
+        runner.run();
+        runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 1);
+
+        // ensure state saved both locally & remotely
+        assertTrue(proc.localStateSaved);
+        assertNotNull(service.values.get(key));
+        assertNotSame(curVal, service.values.get(key));
+    }
+
+
+    private FsPermission create777() {
+        return new FsPermission((short) 0777);
+    }
+
+
+    private class ListHDFSWithMockedFileSystem extends ListHDFS {
+        private final MockFileSystem fileSystem = new MockFileSystem();
+        private boolean localStateSaved = false;
+
+        @Override
+        protected FileSystem getFileSystem() {
+            return fileSystem;
+        }
+
+        @Override
+        protected File getPersistenceFile() {
+            return new File("target/conf/state-file");
+        }
+
+        @Override
+        protected FileSystem getFileSystem(Configuration config) throws IOException {
+            return fileSystem;
+        }
+
+        @Override
+        protected void persistLocalState(String directory, String serializedState) throws IOException {
+            super.persistLocalState(directory, serializedState);
+            localStateSaved = true;
+        }
+    }
+
+
+    private class MockFileSystem extends FileSystem {
+        private final Map<Path, Set<FileStatus>> fileStatuses = new HashMap<>();
+
+        public void addFileStatus(final Path parent, final FileStatus child) {
+            Set<FileStatus> children = fileStatuses.get(parent);
+            if ( children == null ) {
+                children = new HashSet<>();
+                fileStatuses.put(parent, children);
+            }
+
+            children.add(child);
+        }
+
+
+        @Override
+        public long getDefaultBlockSize() {
+            return 1024L;
+        }
+
+        @Override
+        public short getDefaultReplication() {
+            return 1;
+        }
+
+        @Override
+        public URI getUri() {
+            return null;
+        }
+
+        @Override
+        public FSDataInputStream open(Path f, int bufferSize) throws IOException {
+            return null;
+        }
+
+        @Override
+        public FSDataOutputStream create(Path f, FsPermission permission, boolean overwrite, int bufferSize, short replication, long blockSize, Progressable progress) throws IOException {
+            return null;
+        }
+
+        @Override
+        public FSDataOutputStream append(Path f, int bufferSize, Progressable progress) throws IOException {
+            return null;
+        }
+
+        @Override
+        public boolean rename(Path src, Path dst) throws IOException {
+            return false;
+        }
+
+        @Override
+        public boolean delete(Path f, boolean recursive) throws IOException {
+            return false;
+        }
+
+        @Override
+        public FileStatus[] listStatus(Path f) throws FileNotFoundException, IOException {
+            final Set<FileStatus> statuses = fileStatuses.get(f);
+            if ( statuses == null ) {
+                return new FileStatus[0];
+            }
+
+            return statuses.toArray(new FileStatus[statuses.size()]);
+        }
+
+        @Override
+        public void setWorkingDirectory(Path new_dir) {
+
+        }
+
+        @Override
+        public Path getWorkingDirectory() {
+            return new Path(new File(".").getAbsolutePath());
+        }
+
+        @Override
+        public boolean mkdirs(Path f, FsPermission permission) throws IOException {
+            return false;
+        }
+
+        @Override
+        public FileStatus getFileStatus(Path f) throws IOException {
+            return null;
+        }
+
+    }
+
+
+    private class MockCacheClient extends AbstractControllerService implements DistributedMapCacheClient {
+        private ConcurrentMap<Object, Object> values = new ConcurrentHashMap<>();
+        private boolean failOnCalls = false;
+
+        private void verifyNotFail() throws IOException {
+            if ( failOnCalls ) {
+                throw new IOException("Could not call to remote service because Unit Test marked service unavailable");
+            }
+        }
+
+        @Override
+        public <K, V> boolean putIfAbsent(K key, V value, Serializer<K> keySerializer, Serializer<V> valueSerializer) throws IOException {
+            verifyNotFail();
+            final Object retValue = values.putIfAbsent(key, value);
+            return (retValue == null);
+        }
+
+        @Override
+        @SuppressWarnings("unchecked")
+        public <K, V> V getAndPutIfAbsent(K key, V value, Serializer<K> keySerializer, Serializer<V> valueSerializer, Deserializer<V> valueDeserializer) throws IOException {
+            verifyNotFail();
+            return (V) values.putIfAbsent(key, value);
+        }
+
+        @Override
+        public <K> boolean containsKey(K key, Serializer<K> keySerializer) throws IOException {
+            verifyNotFail();
+            return values.containsKey(key);
+        }
+
+        @Override
+        public <K, V> void put(K key, V value, Serializer<K> keySerializer, Serializer<V> valueSerializer) throws IOException {
+            verifyNotFail();
+            values.put(key, value);
+        }
+
+        @Override
+        @SuppressWarnings("unchecked")
+        public <K, V> V get(K key, Serializer<K> keySerializer, Deserializer<V> valueDeserializer) throws IOException {
+            verifyNotFail();
+            return (V) values.get(key);
+        }
+
+        @Override
+        public void close() throws IOException {
+        }
+
+        @Override
+        public <K> boolean remove(K key, Serializer<K> serializer) throws IOException {
+            verifyNotFail();
+            values.remove(key);
+            return true;
+        }
+    }
+}