You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2018/04/27 15:52:18 UTC

nifi git commit: NIFI-5090: Create move target dir dynamically at FetchFTP and FetchSFTP

Repository: nifi
Updated Branches:
  refs/heads/master 159b64b4c -> 6e8b1c8f7


NIFI-5090: Create move target dir dynamically at FetchFTP and FetchSFTP

Support creating target parent directories even if directory listing is disabled.

fixed typo in doc

Signed-off-by: Matthew Burgess <ma...@apache.org>

This closes #2642


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/6e8b1c8f
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/6e8b1c8f
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/6e8b1c8f

Branch: refs/heads/master
Commit: 6e8b1c8f74b957eaef04d1fa7574c29d9c3acbe0
Parents: 159b64b
Author: Koji Kawamura <ij...@apache.org>
Authored: Wed Apr 18 11:14:41 2018 +0900
Committer: Matthew Burgess <ma...@apache.org>
Committed: Fri Apr 27 11:48:26 2018 -0400

----------------------------------------------------------------------
 .../nifi/processors/standard/FetchFTP.java      |   6 +-
 .../processors/standard/FetchFileTransfer.java  |  32 ++-
 .../nifi/processors/standard/FetchSFTP.java     |  10 +
 .../processors/standard/PutFileTransfer.java    |   6 +-
 .../processors/standard/util/FileTransfer.java  |  16 ++
 .../processors/standard/util/SFTPTransfer.java  |  64 ++---
 .../standard/TestFetchFileTransfer.java         |  30 ++-
 .../standard/util/TestSFTPTransfer.java         | 247 +++++++++++++++++++
 8 files changed, 361 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/6e8b1c8f/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFTP.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFTP.java
index 6f3f84d..4886274 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFTP.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFTP.java
@@ -29,11 +29,6 @@ import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processors.standard.util.FileTransfer;
-import org.apache.nifi.processors.standard.FetchFileTransfer;
-import org.apache.nifi.processors.standard.GetFTP;
-import org.apache.nifi.processors.standard.GetSFTP;
-import org.apache.nifi.processors.standard.PutFTP;
-import org.apache.nifi.processors.standard.PutSFTP;
 import org.apache.nifi.processors.standard.util.FTPTransfer;
 
 // Note that we do not use @SupportsBatching annotation. This processor cannot support batching because it must ensure that session commits happen before remote files are deleted.
@@ -62,6 +57,7 @@ public class FetchFTP extends FetchFileTransfer {
         properties.add(REMOTE_FILENAME);
         properties.add(COMPLETION_STRATEGY);
         properties.add(MOVE_DESTINATION_DIR);
+        properties.add(MOVE_CREATE_DIRECTORY);
         properties.add(FTPTransfer.CONNECTION_TIMEOUT);
         properties.add(FTPTransfer.DATA_TIMEOUT);
         properties.add(FTPTransfer.USE_COMPRESSION);

http://git-wip-us.apache.org/repos/asf/nifi/blob/6e8b1c8f/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFileTransfer.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFileTransfer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFileTransfer.java
index 3ba0066..c44a406 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFileTransfer.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFileTransfer.java
@@ -37,6 +37,7 @@ import org.apache.nifi.stream.io.StreamUtils;
 import org.apache.nifi.util.StopWatch;
 import org.apache.nifi.util.Tuple;
 
+import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InputStream;
@@ -101,11 +102,19 @@ public abstract class FetchFileTransfer extends AbstractProcessor {
         .defaultValue(COMPLETION_NONE.getValue())
         .required(true)
         .build();
+    static final PropertyDescriptor MOVE_CREATE_DIRECTORY = new PropertyDescriptor.Builder()
+            .fromPropertyDescriptor(FileTransfer.CREATE_DIRECTORY).description(String.format("Used when '%s' is '%s'. %s",
+                    COMPLETION_STRATEGY.getDisplayName(),
+                    COMPLETION_MOVE.getDisplayName(),
+                    FileTransfer.CREATE_DIRECTORY.getDescription()))
+            .required(false)
+            .build();
     static final PropertyDescriptor MOVE_DESTINATION_DIR = new PropertyDescriptor.Builder()
         .name("Move Destination Directory")
-        .description("The directory on the remote server to the move the original file to once it has been ingested into NiFi. "
-            + "This property is ignored unless the Completion Strategy is set to \"Move File\". The specified directory must already exist on"
-            + "the remote system, or the rename will fail.")
+        .description(String.format("The directory on the remote server to move the original file to once it has been ingested into NiFi. "
+            + "This property is ignored unless the %s is set to '%s'. The specified directory must already exist on "
+            + "the remote system if '%s' is disabled, or the rename will fail.",
+                COMPLETION_STRATEGY.getDisplayName(), COMPLETION_MOVE.getDisplayName(), MOVE_CREATE_DIRECTORY.getDisplayName()))
         .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
         .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
         .required(false)
@@ -189,6 +198,7 @@ public abstract class FetchFileTransfer extends AbstractProcessor {
         properties.add(REMOTE_FILENAME);
         properties.add(COMPLETION_STRATEGY);
         properties.add(MOVE_DESTINATION_DIR);
+        properties.add(MOVE_CREATE_DIRECTORY);
         return properties;
     }
 
@@ -308,15 +318,19 @@ public abstract class FetchFileTransfer extends AbstractProcessor {
                             new Object[]{flowFile, host, port, filename, ioe}, ioe);
                 }
             } else if (COMPLETION_MOVE.getValue().equalsIgnoreCase(completionStrategy)) {
-                String targetDir = context.getProperty(MOVE_DESTINATION_DIR).evaluateAttributeExpressions(flowFile).getValue();
-                if (!targetDir.endsWith("/")) {
-                    targetDir = targetDir + "/";
-                }
+                final String targetDir = context.getProperty(MOVE_DESTINATION_DIR).evaluateAttributeExpressions(flowFile).getValue();
                 final String simpleFilename = StringUtils.substringAfterLast(filename, "/");
-                final String target = targetDir + simpleFilename;
 
                 try {
-                    transfer.rename(flowFile, filename, target);
+                    final String absoluteTargetDirPath = transfer.getAbsolutePath(flowFile, targetDir);
+                    final File targetFile = new File(absoluteTargetDirPath, simpleFilename);
+                    if (context.getProperty(MOVE_CREATE_DIRECTORY).asBoolean()) {
+                        // Create the target directory if necessary.
+                        transfer.ensureDirectoryExists(flowFile, targetFile.getParentFile());
+                    }
+
+                    transfer.rename(flowFile, filename, targetFile.getAbsolutePath());
+
                 } catch (final IOException ioe) {
                     getLogger().warn("Successfully fetched the content for {} from {}:{}{} but failed to rename the remote file due to {}",
                             new Object[]{flowFile, host, port, filename, ioe}, ioe);

http://git-wip-us.apache.org/repos/asf/nifi/blob/6e8b1c8f/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchSFTP.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchSFTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchSFTP.java
index 79eb1e6..6846557 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchSFTP.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchSFTP.java
@@ -49,6 +49,14 @@ public class FetchSFTP extends FetchFileTransfer {
     @Override
     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
         final PropertyDescriptor port = new PropertyDescriptor.Builder().fromPropertyDescriptor(UNDEFAULTED_PORT).defaultValue("22").build();
+        final PropertyDescriptor disableDirectoryListing = new PropertyDescriptor.Builder()
+                .fromPropertyDescriptor(SFTPTransfer.DISABLE_DIRECTORY_LISTING)
+                .description(String.format("Control how '%s' is created when '%s' is '%s' and '%s' is enabled. %s",
+                        MOVE_DESTINATION_DIR.getDisplayName(),
+                        COMPLETION_STRATEGY.getDisplayName(),
+                        COMPLETION_MOVE.getDisplayName(),
+                        MOVE_CREATE_DIRECTORY.getDisplayName(),
+                        SFTPTransfer.DISABLE_DIRECTORY_LISTING.getDescription())).build();
 
         final List<PropertyDescriptor> properties = new ArrayList<>();
         properties.add(HOSTNAME);
@@ -60,6 +68,8 @@ public class FetchSFTP extends FetchFileTransfer {
         properties.add(REMOTE_FILENAME);
         properties.add(COMPLETION_STRATEGY);
         properties.add(MOVE_DESTINATION_DIR);
+        properties.add(MOVE_CREATE_DIRECTORY);
+        properties.add(disableDirectoryListing);
         properties.add(SFTPTransfer.CONNECTION_TIMEOUT);
         properties.add(SFTPTransfer.DATA_TIMEOUT);
         properties.add(SFTPTransfer.USE_KEEPALIVE_ON_TIMEOUT);

http://git-wip-us.apache.org/repos/asf/nifi/blob/6e8b1c8f/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutFileTransfer.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutFileTransfer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutFileTransfer.java
index cbaa9ec..12eafdb 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutFileTransfer.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutFileTransfer.java
@@ -106,11 +106,7 @@ public abstract class PutFileTransfer<T extends FileTransfer> extends AbstractPr
                 if (rootPath == null) {
                     workingDirPath = null;
                 } else {
-                    File workingDirectory = new File(rootPath);
-                    if (!workingDirectory.getPath().startsWith("/") && !workingDirectory.getPath().startsWith("\\")) {
-                        workingDirectory = new File(transfer.getHomeDirectory(flowFile), workingDirectory.getPath());
-                    }
-                    workingDirPath = workingDirectory.getPath().replace("\\", "/");
+                    workingDirPath = transfer.getAbsolutePath(flowFile, rootPath);
                 }
 
                 final boolean rejectZeroByteFiles = context.getProperty(FileTransfer.REJECT_ZERO_BYTE).asBoolean();

http://git-wip-us.apache.org/repos/asf/nifi/blob/6e8b1c8f/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FileTransfer.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FileTransfer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FileTransfer.java
index 1163ea8..d500b9d 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FileTransfer.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FileTransfer.java
@@ -58,6 +58,22 @@ public interface FileTransfer extends Closeable {
 
     void ensureDirectoryExists(FlowFile flowFile, File remoteDirectory) throws IOException;
 
+    /**
+     * Compute an absolute file path for the given remote path.
+     * @param flowFile is used to setup file transfer client with its attribute values, to get user home directory
+     * @param remotePath the target remote path
+     * @return The absolute path for the given remote path
+     */
+    default String getAbsolutePath(FlowFile flowFile, String remotePath) throws IOException {
+        final String absoluteRemotePath;
+        if (!remotePath.startsWith("/") && !remotePath.startsWith("\\")) {
+            absoluteRemotePath = new File(getHomeDirectory(flowFile), remotePath).getPath();
+        } else {
+            absoluteRemotePath = remotePath;
+        }
+        return absoluteRemotePath.replace("\\", "/");
+    }
+
     public static final PropertyDescriptor HOSTNAME = new PropertyDescriptor.Builder()
         .name("Hostname")
         .description("The fully qualified hostname or IP address of the remote system")

http://git-wip-us.apache.org/repos/asf/nifi/blob/6e8b1c8f/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SFTPTransfer.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SFTPTransfer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SFTPTransfer.java
index d7aa6e3..c11a53b 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SFTPTransfer.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SFTPTransfer.java
@@ -106,7 +106,7 @@ public class SFTPTransfer implements FileTransfer {
         .description("If set to 'true', directory listing is not performed prior to create missing directories." +
                 " By default, this processor executes a directory listing command" +
                 " to see target directory existence before creating missing directories." +
-                " However, there are situations that you might need to disable the directory listing such as followings." +
+                " However, there are situations that you might need to disable the directory listing such as the following." +
                 " Directory listing might fail with some permission setups (e.g. chmod 100) on a directory." +
                 " Also, if any other SFTP client created the directory after this processor performed a listing" +
                 " and before a directory creation request by this processor is finished," +
@@ -353,48 +353,52 @@ public class SFTPTransfer implements FileTransfer {
         final String remoteDirectory = directoryName.getAbsolutePath().replace("\\", "/").replaceAll("^.\\:", "");
 
         // if we disable the directory listing, we just want to blindly perform the mkdir command,
-        // eating any exceptions thrown (like if the directory already exists).
+        // eating failure exceptions thrown (like if the directory already exists).
         if (disableDirectoryListing) {
             try {
+                // Blindly create the dir.
                 channel.mkdir(remoteDirectory);
+                // The remote directory did not exist, and was created successfully.
+                return;
             } catch (SftpException e) {
-                if (e.id != ChannelSftp.SSH_FX_FAILURE) {
+                if (e.id == ChannelSftp.SSH_FX_NO_SUCH_FILE) {
+                    // No Such File. This happens when parent directory was not found.
+                    logger.debug(String.format("Could not create %s due to 'No such file'. Will try to create the parent dir.", remoteDirectory));
+                } else if (e.id == ChannelSftp.SSH_FX_FAILURE) {
+                    // Swallow '4: Failure' including the remote directory already exists.
+                    logger.debug("Could not blindly create remote directory due to " + e.getMessage(), e);
+                    return;
+                } else {
                     throw new IOException("Could not blindly create remote directory due to " + e.getMessage(), e);
                 }
             }
-            return;
+        } else {
+            try {
+                // Check dir existence.
+                channel.stat(remoteDirectory);
+                // The remote directory already exists.
+                return;
+            } catch (final SftpException e) {
+                if (e.id != ChannelSftp.SSH_FX_NO_SUCH_FILE) {
+                    throw new IOException("Failed to determine if remote directory exists at " + remoteDirectory + " due to " + e, e);
+                }
+            }
         }
-        // end if disableDirectoryListing
 
-        boolean exists;
+        // first ensure parent directories exist before creating this one
+        if (directoryName.getParent() != null && !directoryName.getParentFile().equals(new File(File.separator))) {
+            ensureDirectoryExists(flowFile, directoryName.getParentFile());
+        }
+        logger.debug("Remote Directory {} does not exist; creating it", new Object[] {remoteDirectory});
         try {
-            channel.stat(remoteDirectory);
-            exists = true;
+            channel.mkdir(remoteDirectory);
+            logger.debug("Created {}", new Object[] {remoteDirectory});
         } catch (final SftpException e) {
-            if (e.id == ChannelSftp.SSH_FX_NO_SUCH_FILE) {
-                // No Such File
-                exists = false;
-            } else {
-                throw new IOException("Failed to determine if remote directory exists at " + remoteDirectory + " due to " + e, e);
-            }
-        }
-
-        if (!exists) {
-            // first ensure parent directories exist before creating this one
-            if (directoryName.getParent() != null && !directoryName.getParentFile().equals(new File(File.separator))) {
-                ensureDirectoryExists(flowFile, directoryName.getParentFile());
-            }
-            logger.debug("Remote Directory {} does not exist; creating it", new Object[] {remoteDirectory});
-            try {
-                channel.mkdir(remoteDirectory);
-                logger.debug("Created {}", new Object[] {remoteDirectory});
-            } catch (final SftpException e) {
-                throw new IOException("Failed to create remote directory " + remoteDirectory + " due to " + e, e);
-            }
+            throw new IOException("Failed to create remote directory " + remoteDirectory + " due to " + e, e);
         }
     }
 
-    private ChannelSftp getChannel(final FlowFile flowFile) throws IOException {
+    protected ChannelSftp getChannel(final FlowFile flowFile) throws IOException {
         if (sftp != null) {
             String sessionhost = session.getHost();
             String desthost = ctx.getProperty(HOSTNAME).evaluateAttributeExpressions(flowFile).getValue();
@@ -633,7 +637,7 @@ public class SFTPTransfer implements FileTransfer {
         } catch (final SftpException e) {
             switch (e.id) {
                 case ChannelSftp.SSH_FX_NO_SUCH_FILE:
-                    throw new FileNotFoundException();
+                    throw new FileNotFoundException("No such file or directory");
                 case ChannelSftp.SSH_FX_PERMISSION_DENIED:
                     throw new PermissionDeniedException("Could not rename remote file " + source + " to " + target + " due to insufficient permissions");
                 default:

http://git-wip-us.apache.org/repos/asf/nifi/blob/6e8b1c8f/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFetchFileTransfer.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFetchFileTransfer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFetchFileTransfer.java
index 4965893..de76b07 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFetchFileTransfer.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFetchFileTransfer.java
@@ -130,6 +130,7 @@ public class TestFetchFileTransfer {
         runner.setProperty(FetchFileTransfer.REMOTE_FILENAME, "${filename}");
         runner.setProperty(FetchFileTransfer.COMPLETION_STRATEGY, FetchFileTransfer.COMPLETION_MOVE.getValue());
         runner.setProperty(FetchFileTransfer.MOVE_DESTINATION_DIR, "/moved");
+        runner.setProperty(FetchFileTransfer.MOVE_CREATE_DIRECTORY, "true");
 
         proc.addContent("hello.txt", "world".getBytes());
         final Map<String, String> attrs = new HashMap<>();
@@ -228,10 +229,35 @@ public class TestFetchFileTransfer {
         assertTrue(proc.fileContents.containsKey("hello.txt"));
     }
 
+    @Test
+    public void testCreateDirFails() {
+        final TestableFetchFileTransfer proc = new TestableFetchFileTransfer();
+        final TestRunner runner = TestRunners.newTestRunner(proc);
+        runner.setProperty(FetchFileTransfer.HOSTNAME, "localhost");
+        runner.setProperty(FetchFileTransfer.UNDEFAULTED_PORT, "11");
+        runner.setProperty(FetchFileTransfer.REMOTE_FILENAME, "${filename}");
+        runner.setProperty(FetchFileTransfer.COMPLETION_STRATEGY, FetchFileTransfer.COMPLETION_MOVE.getValue());
+        runner.setProperty(FetchFileTransfer.MOVE_DESTINATION_DIR, "/moved/");
+        runner.setProperty(FetchFileTransfer.MOVE_CREATE_DIRECTORY, "true");
+
+        proc.addContent("hello.txt", "world".getBytes());
+        final Map<String, String> attrs = new HashMap<>();
+        attrs.put("filename", "hello.txt");
+        runner.enqueue(new byte[0], attrs);
+        proc.allowCreateDir = false;
+
+        runner.run(1, false, false);
+        runner.assertAllFlowFilesTransferred(FetchFileTransfer.REL_SUCCESS, 1);
+        assertEquals(1, proc.fileContents.size());
+
+        assertTrue(proc.fileContents.containsKey("hello.txt"));
+    }
+
 
     private static class TestableFetchFileTransfer extends FetchFileTransfer {
         private boolean allowAccess = true;
         private boolean allowDelete = true;
+        private boolean allowCreateDir = true;
         private boolean allowRename = true;
         private boolean closed = false;
         private final Map<String, byte[]> fileContents = new HashMap<>();
@@ -340,7 +366,9 @@ public class TestFetchFileTransfer {
 
                 @Override
                 public void ensureDirectoryExists(FlowFile flowFile, File remoteDirectory) throws IOException {
-
+                    if (!allowCreateDir) {
+                        throw new PermissionDeniedException("test permission denied");
+                    }
                 }
             };
         }

http://git-wip-us.apache.org/repos/asf/nifi/blob/6e8b1c8f/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestSFTPTransfer.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestSFTPTransfer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestSFTPTransfer.java
new file mode 100644
index 0000000..9a17838
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestSFTPTransfer.java
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard.util;
+
+import com.jcraft.jsch.ChannelSftp;
+import com.jcraft.jsch.SftpException;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.MockPropertyValue;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static com.jcraft.jsch.ChannelSftp.SSH_FX_FAILURE;
+import static com.jcraft.jsch.ChannelSftp.SSH_FX_NO_SUCH_FILE;
+import static com.jcraft.jsch.ChannelSftp.SSH_FX_PERMISSION_DENIED;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class TestSFTPTransfer {
+
+    private static final Logger logger = LoggerFactory.getLogger(TestSFTPTransfer.class);
+
+    private SFTPTransfer createSftpTransfer(ProcessContext processContext, ChannelSftp channel) {
+        final ComponentLog componentLog = mock(ComponentLog.class);
+        return new SFTPTransfer(processContext, componentLog) {
+            @Override
+            protected ChannelSftp getChannel(FlowFile flowFile) throws IOException {
+                return channel;
+            }
+        };
+    }
+
+    @Test
+    public void testEnsureDirectoryExistsAlreadyExisted() throws IOException, SftpException {
+        final ProcessContext processContext = mock(ProcessContext.class);
+        final ChannelSftp channel = mock(ChannelSftp.class);
+        final SFTPTransfer sftpTransfer = createSftpTransfer(processContext, channel);
+        final MockFlowFile flowFile = new MockFlowFile(0);
+        final File remoteDir = new File("/dir1/dir2/dir3");
+        sftpTransfer.ensureDirectoryExists(flowFile, remoteDir);
+
+        // Dir existence check should be done by stat
+        verify(channel).stat(eq("/dir1/dir2/dir3"));
+    }
+
+    @Test
+    public void testEnsureDirectoryExistsFailedToStat() throws IOException, SftpException {
+        final ProcessContext processContext = mock(ProcessContext.class);
+        final ChannelSftp channel = mock(ChannelSftp.class);
+        // stat for the parent was successful, simulating that dir2 exists, but no dir3.
+        when(channel.stat("/dir1/dir2/dir3")).thenThrow(new SftpException(SSH_FX_FAILURE, "Failure"));
+
+        final SFTPTransfer sftpTransfer = createSftpTransfer(processContext, channel);
+        final MockFlowFile flowFile = new MockFlowFile(0);
+        final File remoteDir = new File("/dir1/dir2/dir3");
+        try {
+            sftpTransfer.ensureDirectoryExists(flowFile, remoteDir);
+            fail("Should fail");
+        } catch (IOException e) {
+            assertEquals("Failed to determine if remote directory exists at /dir1/dir2/dir3 due to 4: Failure", e.getMessage());
+        }
+
+        // Dir existence check should be done by stat
+        verify(channel).stat(eq("/dir1/dir2/dir3"));
+    }
+
+    @Test
+    public void testEnsureDirectoryExistsNotExisted() throws IOException, SftpException {
+        final ProcessContext processContext = mock(ProcessContext.class);
+        final ChannelSftp channel = mock(ChannelSftp.class);
+        // stat for the parent was successful, simulating that dir2 exists, but no dir3.
+        when(channel.stat("/dir1/dir2/dir3")).thenThrow(new SftpException(SSH_FX_NO_SUCH_FILE, "No such file"));
+
+        final SFTPTransfer sftpTransfer = createSftpTransfer(processContext, channel);
+        final MockFlowFile flowFile = new MockFlowFile(0);
+        final File remoteDir = new File("/dir1/dir2/dir3");
+        sftpTransfer.ensureDirectoryExists(flowFile, remoteDir);
+
+        // Dir existence check should be done by stat
+        verify(channel).stat(eq("/dir1/dir2/dir3")); // dir3 was not found
+        verify(channel).stat(eq("/dir1/dir2")); // so, dir2 was checked
+        verify(channel).mkdir(eq("/dir1/dir2/dir3")); // dir2 existed, so dir3 was created.
+    }
+
+    @Test
+    public void testEnsureDirectoryExistsParentNotExisted() throws IOException, SftpException {
+        final ProcessContext processContext = mock(ProcessContext.class);
+        final ChannelSftp channel = mock(ChannelSftp.class);
+        // stat for the dir1 was successful, simulating that dir1 exists, but no dir2 and dir3.
+        when(channel.stat("/dir1/dir2/dir3")).thenThrow(new SftpException(SSH_FX_NO_SUCH_FILE, "No such file"));
+        when(channel.stat("/dir1/dir2")).thenThrow(new SftpException(SSH_FX_NO_SUCH_FILE, "No such file"));
+
+        final SFTPTransfer sftpTransfer = createSftpTransfer(processContext, channel);
+        final MockFlowFile flowFile = new MockFlowFile(0);
+        final File remoteDir = new File("/dir1/dir2/dir3");
+        sftpTransfer.ensureDirectoryExists(flowFile, remoteDir);
+
+        // Dir existence check should be done by stat
+        verify(channel).stat(eq("/dir1/dir2/dir3")); // dir3 was not found
+        verify(channel).stat(eq("/dir1/dir2")); // dir2 was not found, too
+        verify(channel).stat(eq("/dir1")); // dir1 was found
+        verify(channel).mkdir(eq("/dir1/dir2")); // dir1 existed, so dir2 was created.
+        verify(channel).mkdir(eq("/dir1/dir2/dir3")); // then dir3 was created.
+    }
+
+    @Test
+    public void testEnsureDirectoryExistsNotExistedFailedToCreate() throws IOException, SftpException {
+        final ProcessContext processContext = mock(ProcessContext.class);
+        final ChannelSftp channel = mock(ChannelSftp.class);
+        // stat for the parent was successful, simulating that dir2 exists, but no dir3.
+        when(channel.stat("/dir1/dir2/dir3")).thenThrow(new SftpException(SSH_FX_NO_SUCH_FILE, "No such file"));
+        // Failed to create dir3.
+        doThrow(new SftpException(SSH_FX_FAILURE, "Failed")).when(channel).mkdir(eq("/dir1/dir2/dir3"));
+
+        final SFTPTransfer sftpTransfer = createSftpTransfer(processContext, channel);
+        final MockFlowFile flowFile = new MockFlowFile(0);
+        final File remoteDir = new File("/dir1/dir2/dir3");
+        try {
+            sftpTransfer.ensureDirectoryExists(flowFile, remoteDir);
+            fail("Should fail");
+        } catch (IOException e) {
+            assertEquals("Failed to create remote directory /dir1/dir2/dir3 due to 4: Failed", e.getMessage());
+        }
+
+        // Dir existence check should be done by stat
+        verify(channel).stat(eq("/dir1/dir2/dir3")); // dir3 was not found
+        verify(channel).stat(eq("/dir1/dir2")); // so, dir2 was checked
+        verify(channel).mkdir(eq("/dir1/dir2/dir3")); // dir2 existed, so dir3 was created.
+    }
+
+    @Test
+    public void testEnsureDirectoryExistsBlindlyNotExisted() throws IOException, SftpException {
+        final ProcessContext processContext = mock(ProcessContext.class);
+        when(processContext.getProperty(SFTPTransfer.DISABLE_DIRECTORY_LISTING)).thenReturn(new MockPropertyValue("true"));
+
+        final ChannelSftp channel = mock(ChannelSftp.class);
+        final SFTPTransfer sftpTransfer = createSftpTransfer(processContext, channel);
+        final MockFlowFile flowFile = new MockFlowFile(0);
+        final File remoteDir = new File("/dir1/dir2/dir3");
+        sftpTransfer.ensureDirectoryExists(flowFile, remoteDir);
+
+        // stat should not be called.
+        verify(channel, times(0)).stat(eq("/dir1/dir2/dir3"));
+        verify(channel).mkdir(eq("/dir1/dir2/dir3")); // dir3 was created blindly.
+    }
+
+    @Test
+    public void testEnsureDirectoryExistsBlindlyParentNotExisted() throws IOException, SftpException {
+        final ProcessContext processContext = mock(ProcessContext.class);
+        when(processContext.getProperty(SFTPTransfer.DISABLE_DIRECTORY_LISTING)).thenReturn(new MockPropertyValue("true"));
+
+        final ChannelSftp channel = mock(ChannelSftp.class);
+        final AtomicInteger mkdirCount = new AtomicInteger(0);
+        doAnswer(invocation -> {
+            final int cnt = mkdirCount.getAndIncrement();
+            if (cnt == 0) {
+                // If the parent dir does not exist, no such file exception is thrown.
+                throw new SftpException(SSH_FX_NO_SUCH_FILE, "Failure");
+            } else {
+                logger.info("Created the dir successfully for the 2nd time");
+            }
+            return true;
+        }).when(channel).mkdir(eq("/dir1/dir2/dir3"));
+
+        final SFTPTransfer sftpTransfer = createSftpTransfer(processContext, channel);
+        final MockFlowFile flowFile = new MockFlowFile(0);
+        final File remoteDir = new File("/dir1/dir2/dir3");
+        sftpTransfer.ensureDirectoryExists(flowFile, remoteDir);
+
+        // stat should not be called.
+        verify(channel, times(0)).stat(eq("/dir1/dir2/dir3"));
+        // dir3 was created blindly, but failed for the 1st time, and succeeded for the 2nd time.
+        verify(channel, times(2)).mkdir(eq("/dir1/dir2/dir3"));
+        verify(channel).mkdir(eq("/dir1/dir2")); // dir2 was created successfully.
+    }
+
+    @Test
+    public void testEnsureDirectoryExistsBlindlyAlreadyExisted() throws IOException, SftpException {
+        final ProcessContext processContext = mock(ProcessContext.class);
+        when(processContext.getProperty(SFTPTransfer.DISABLE_DIRECTORY_LISTING)).thenReturn(new MockPropertyValue("true"));
+
+        final ChannelSftp channel = mock(ChannelSftp.class);
+        // If the dir existed, a failure exception is thrown, but should be swallowed.
+        doThrow(new SftpException(SSH_FX_FAILURE, "Failure")).when(channel).mkdir(eq("/dir1/dir2/dir3"));
+
+        final SFTPTransfer sftpTransfer = createSftpTransfer(processContext, channel);
+        final MockFlowFile flowFile = new MockFlowFile(0);
+        final File remoteDir = new File("/dir1/dir2/dir3");
+        sftpTransfer.ensureDirectoryExists(flowFile, remoteDir);
+
+        // stat should not be called.
+        verify(channel, times(0)).stat(eq("/dir1/dir2/dir3"));
+        verify(channel).mkdir(eq("/dir1/dir2/dir3")); // dir3 was created blindly.
+    }
+
+    @Test
+    public void testEnsureDirectoryExistsBlindlyFailed() throws IOException, SftpException {
+        final ProcessContext processContext = mock(ProcessContext.class);
+        when(processContext.getProperty(SFTPTransfer.DISABLE_DIRECTORY_LISTING)).thenReturn(new MockPropertyValue("true"));
+
+        final ChannelSftp channel = mock(ChannelSftp.class);
+        doThrow(new SftpException(SSH_FX_PERMISSION_DENIED, "Permission denied")).when(channel).mkdir(eq("/dir1/dir2/dir3"));
+
+        final SFTPTransfer sftpTransfer = createSftpTransfer(processContext, channel);
+        final MockFlowFile flowFile = new MockFlowFile(0);
+        final File remoteDir = new File("/dir1/dir2/dir3");
+        try {
+            sftpTransfer.ensureDirectoryExists(flowFile, remoteDir);
+            fail("Should fail");
+        } catch (IOException e) {
+            assertEquals("Could not blindly create remote directory due to Permission denied", e.getMessage());
+        }
+
+        // stat should not be called.
+        verify(channel, times(0)).stat(eq("/dir1/dir2/dir3"));
+        verify(channel).mkdir(eq("/dir1/dir2/dir3")); // dir3 was created blindly.
+    }
+
+}