You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2019/09/19 14:59:59 UTC

[nifi] branch master updated: NIFI-5816 This closes #3726. Remove unused SFTP classes that reference Jsch NIFI-5816 Switching SFTP processors from JSCH to SSHJ NIFI-5816 LICENSE/NOTICIE updates to reflect changing from JSch to SSHJ

This is an automated email from the ASF dual-hosted git repository.

joewitt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/master by this push:
     new ff6a7d9  NIFI-5816 This closes #3726. Remove unused SFTP classes that reference Jsch NIFI-5816 Switching SFTP processors from JSCH to SSHJ NIFI-5816 LICENSE/NOTICIE updates to reflect changing from JSch to SSHJ
ff6a7d9 is described below

commit ff6a7d95613fc01601d3f60b5fb976c10d89949f
Author: Bryan Bende <bb...@apache.org>
AuthorDate: Fri Sep 6 15:12:38 2019 -0400

    NIFI-5816 This closes #3726. Remove unused SFTP classes that reference Jsch
    NIFI-5816 Switching SFTP processors from JSCH to SSHJ
    NIFI-5816 LICENSE/NOTICIE updates to reflect changing from JSch to SSHJ
    
    Signed-off-by: Joe Witt <jo...@apache.org>
---
 nifi-assembly/LICENSE                              |  29 -
 nifi-assembly/NOTICE                               |  12 +
 .../org/apache/nifi/util/MockPropertyContext.java  |  52 ++
 .../processor/util/list/AbstractListProcessor.java |   4 +-
 .../src/main/resources/conf/logback.xml            |   4 +
 .../src/main/resources/META-INF/LICENSE            |  59 --
 .../src/main/resources/META-INF/NOTICE             |  12 +
 .../nifi-standard-processors/pom.xml               |   8 +-
 .../processors/standard/util/SFTPConnection.java   |  73 --
 .../standard/util/SFTPFlowFileSourceFile.java      |  94 +++
 .../processors/standard/util/SFTPTransfer.java     | 623 +++++++++-------
 .../nifi/processors/standard/util/SFTPUtils.java   | 312 --------
 .../processors/standard/util/TestSFTPTransfer.java | 134 ++--
 .../util/TestSFTPTransferWithSSHTestServer.java    | 786 +++++++++++++++++++++
 nifi-nar-bundles/nifi-standard-bundle/pom.xml      |  11 +-
 15 files changed, 1390 insertions(+), 823 deletions(-)

diff --git a/nifi-assembly/LICENSE b/nifi-assembly/LICENSE
index eb00732..b1e3de8 100644
--- a/nifi-assembly/LICENSE
+++ b/nifi-assembly/LICENSE
@@ -1419,35 +1419,6 @@ which is licensed under a BSD license.
 
     http://code.google.com/p/jsr-305/
 
-This product bundles 'JCraft Jzlib' which is available under a 3-Clause BSD License.
-
-    Copyright (c) 2002-2014 Atsuhiko Yamanaka, JCraft,Inc.
-    All rights reserved.
-
-    Redistribution and use in source and binary forms, with or without
-    modification, are permitted provided that the following conditions are met:
-
-      1. Redistributions of source code must retain the above copyright notice,
-         this list of conditions and the following disclaimer.
-
-      2. Redistributions in binary form must reproduce the above copyright
-         notice, this list of conditions and the following disclaimer in
-         the documentation and/or other materials provided with the distribution.
-
-      3. The names of the authors may not be used to endorse or promote products
-         derived from this software without specific prior written permission.
-
-    THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES,
-    INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
-    FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL JCRAFT,
-    INC. OR ANY CONTRIBUTORS TO THIS SOFTWARE BE LIABLE FOR ANY DIRECT, INDIRECT,
-    INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
-    LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA,
-    OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
-    LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
-    NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
-    EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
 The binary distribution of this product includes modules from Groovy which bundles ASM
   /***
    * http://asm.objectweb.org/
diff --git a/nifi-assembly/NOTICE b/nifi-assembly/NOTICE
index d62f0bf..e8bfdcd 100644
--- a/nifi-assembly/NOTICE
+++ b/nifi-assembly/NOTICE
@@ -1822,6 +1822,18 @@ The following binary components are provided under the Apache Software License v
       Apache Yetus
       Copyright 2008-2018 The Apache Software Foundation
 
+  (ASLv2) sshj
+    The following NOTICE information applie:
+
+      sshj - SSHv2 library for Java
+      Copyright 2010-2012 sshj contributors
+
+      This product includes code derived from software developed at
+      The Apache Software Foundation (http://www.apache.org/):
+
+      - Apache MINA SSHD
+      - Apache Commons-Net
+
 ************************
 Common Development and Distribution License 1.1
 ************************
diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/MockPropertyContext.java b/nifi-mock/src/main/java/org/apache/nifi/util/MockPropertyContext.java
new file mode 100644
index 0000000..5c4647d
--- /dev/null
+++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockPropertyContext.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.util;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.context.PropertyContext;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+public class MockPropertyContext implements PropertyContext {
+
+    private final Map<PropertyDescriptor, String> properties;
+
+    public MockPropertyContext(final Map<PropertyDescriptor, String> properties) {
+        this.properties = properties;
+    }
+
+    @Override
+    public PropertyValue getProperty(final PropertyDescriptor property) {
+        String value = properties.get(property);
+        if (value == null) {
+            value = property.getDefaultValue();
+        }
+        return new MockPropertyValue(value);
+    }
+
+    @Override
+    public Map<String, String> getAllProperties() {
+        final Map<String,String> propValueMap = new LinkedHashMap<>();
+        for (final Map.Entry<PropertyDescriptor, String> entry : properties.entrySet()) {
+            propValueMap.put(entry.getKey().getName(), entry.getValue());
+        }
+        return propValueMap;
+    }
+
+}
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/AbstractListProcessor.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/AbstractListProcessor.java
index c30bb0d..af5775f 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/AbstractListProcessor.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/AbstractListProcessor.java
@@ -471,7 +471,7 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab
             // track of when this last executed for consideration of the lag nanos
             entityList = performListing(context, minTimestampToListMillis);
         } catch (final IOException e) {
-            getLogger().error("Failed to perform listing on remote host due to {}", e);
+            getLogger().error("Failed to perform listing on remote host due to {}", new Object[]{e.getMessage()}, e);
             context.yield();
             return;
         }
@@ -740,7 +740,7 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab
             try {
                 return performListing(context, minTimestampToList);
             } catch (final IOException e) {
-                getLogger().error("Failed to perform listing on remote host due to {}", e);
+                getLogger().error("Failed to perform listing on remote host due to {}", new Object[]{e.getMessage()}, e);
                 return Collections.emptyList();
             }
         }, entity -> createAttributes(entity, context));
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/logback.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/logback.xml
index a16a49e..03d401a 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/logback.xml
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/logback.xml
@@ -120,6 +120,10 @@
     <!-- Suppress non-error messages due to Jetty AnnotationParser emitting a large amount of WARNS. Issue described in NIFI-5479. -->
     <logger name="org.eclipse.jetty.annotations.AnnotationParser" level="ERROR"/>
 
+    <!-- Suppress non-error messages from SSHJ which was emitting large amounts of INFO logs by default -->
+    <logger name="net.schmizz.sshj" level="WARN" />
+    <logger name="com.hierynomus.sshj" level="WARN" />
+
     <!--
         Logger for capturing user events. We do not want to propagate these
         log events to the root logger. These messages are only sent to the
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-nar/src/main/resources/META-INF/LICENSE b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-nar/src/main/resources/META-INF/LICENSE
index a09260f..aa8c950 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-nar/src/main/resources/META-INF/LICENSE
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-nar/src/main/resources/META-INF/LICENSE
@@ -258,65 +258,6 @@ under an MIT style license.
     OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
     THE SOFTWARE.
 
-This product bundes 'JCraft Jsch' which is available under a 3-Clause BSD 
-License.
-
-    Copyright (c) 2002-2015 Atsuhiko Yamanaka, JCraft,Inc. 
-    All rights reserved.
-
-    Redistribution and use in source and binary forms, with or without
-    modification, are permitted provided that the following conditions are met:
-
-      1. Redistributions of source code must retain the above copyright notice,
-         this list of conditions and the following disclaimer.
-
-      2. Redistributions in binary form must reproduce the above copyright 
-         notice, this list of conditions and the following disclaimer in 
-         the documentation and/or other materials provided with the distribution.
-
-      3. The names of the authors may not be used to endorse or promote products
-         derived from this software without specific prior written permission.
-
-    THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES,
-    INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
-    FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL JCRAFT,
-    INC. OR ANY CONTRIBUTORS TO THIS SOFTWARE BE LIABLE FOR ANY DIRECT, INDIRECT,
-    INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
-    LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA,
-    OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
-    LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
-    NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
-    EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
-This product bundles 'JCraft Jzlib' which is available under a 3-Clause BSD License.
-
-    Copyright (c) 2002-2014 Atsuhiko Yamanaka, JCraft,Inc. 
-    All rights reserved.
-
-    Redistribution and use in source and binary forms, with or without
-    modification, are permitted provided that the following conditions are met:
-
-      1. Redistributions of source code must retain the above copyright notice,
-         this list of conditions and the following disclaimer.
-
-      2. Redistributions in binary form must reproduce the above copyright 
-         notice, this list of conditions and the following disclaimer in 
-         the documentation and/or other materials provided with the distribution.
-
-      3. The names of the authors may not be used to endorse or promote products
-         derived from this software without specific prior written permission.
-
-    THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES,
-    INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
-    FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL JCRAFT,
-    INC. OR ANY CONTRIBUTORS TO THIS SOFTWARE BE LIABLE FOR ANY DIRECT, INDIRECT,
-    INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
-    LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA,
-    OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
-    LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
-    NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
-    EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
 This product bundles 'asm' which is available under a 3-Clause BSD style license.
 For details see http://asm.ow2.org/asmdex-license.html
 
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-nar/src/main/resources/META-INF/NOTICE b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-nar/src/main/resources/META-INF/NOTICE
index 4e337b7..1cbcfd8 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-nar/src/main/resources/META-INF/NOTICE
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-nar/src/main/resources/META-INF/NOTICE
@@ -226,6 +226,18 @@ The following binary components are provided under the Apache Software License v
 
         Copyright 2018 simple-syslog-5424 authors.
 
+  (ASLv2) sshj
+    The following NOTICE information applie:
+
+      sshj - SSHv2 library for Java
+      Copyright 2010-2012 sshj contributors
+
+      This product includes code derived from software developed at
+      The Apache Software Foundation (http://www.apache.org/):
+
+      - Apache MINA SSHD
+      - Apache Commons-Net
+
 ************************
 Common Development and Distribution License 1.1
 ************************
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
index f7e5621..0b15be0 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
@@ -123,12 +123,8 @@
             <version>1.10.0-SNAPSHOT</version>
         </dependency>
         <dependency>
-            <groupId>com.jcraft</groupId>
-            <artifactId>jsch</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>com.jcraft</groupId>
-            <artifactId>jzlib</artifactId>
+            <groupId>com.hierynomus</groupId>
+            <artifactId>sshj</artifactId>
         </dependency>
         <dependency>
             <groupId>org.eclipse.jetty</groupId>
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SFTPConnection.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SFTPConnection.java
deleted file mode 100644
index b07c320..0000000
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SFTPConnection.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.standard.util;
-
-import com.jcraft.jsch.ChannelSftp;
-import com.jcraft.jsch.Session;
-import java.io.Closeable;
-import java.io.IOException;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-public final class SFTPConnection implements Closeable {
-
-    private static final Log logger = LogFactory.getLog(SFTPConnection.class);
-    private final Session session;
-    private final ChannelSftp sftp;
-
-    public SFTPConnection(final Session session, final ChannelSftp sftp) {
-        this.session = session;
-        this.sftp = sftp;
-    }
-
-    public Session getSession() {
-        return session;
-    }
-
-    public ChannelSftp getSftp() {
-        return sftp;
-    }
-
-    @Override
-    public void close() throws IOException {
-        if (null == sftp) {
-            return;
-        }
-        try {
-            if (null != session) {
-                session.disconnect();
-            }
-        } catch (final Exception ex) {
-            /*IGNORE*/
-            logger.warn("Unable to disconnect session due to " + ex);
-            if (logger.isDebugEnabled()) {
-                logger.warn("", ex);
-            }
-        }
-        try {
-            if (null != sftp) {
-                sftp.exit();
-            }
-        } catch (final Exception ex) {
-            /*IGNORE*/
-            logger.warn("Unable to disconnect session due to " + ex);
-            if (logger.isDebugEnabled()) {
-                logger.warn("", ex);
-            }
-        }
-    }
-}
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SFTPFlowFileSourceFile.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SFTPFlowFileSourceFile.java
new file mode 100644
index 0000000..72738e9
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SFTPFlowFileSourceFile.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard.util;
+
+import net.schmizz.sshj.xfer.LocalFileFilter;
+import net.schmizz.sshj.xfer.LocalSourceFile;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Collections;
+
+/**
+ * Implementation of SSHJ's LocalSourceFile so we can call 'put' using a FlowFile's InputStream.
+ */
+public class SFTPFlowFileSourceFile implements LocalSourceFile {
+
+    private final String filename;
+    private final InputStream inputStream;
+    private final int permissions;
+
+    public SFTPFlowFileSourceFile(final String filename, final InputStream inputStream) {
+        this(filename, inputStream, 0);
+    }
+
+    public SFTPFlowFileSourceFile(final String filename, final InputStream inputStream, final int perms) {
+        this.filename = filename;
+        this.inputStream = inputStream;
+        this.permissions = perms;
+    }
+
+    @Override
+    public String getName() {
+        return filename;
+    }
+
+    @Override
+    public long getLength() {
+        return 0;
+    }
+
+    @Override
+    public InputStream getInputStream() throws IOException {
+        return inputStream;
+    }
+
+    @Override
+    public int getPermissions() throws IOException {
+        return permissions;
+    }
+
+    @Override
+    public boolean isFile() {
+        return true;
+    }
+
+    @Override
+    public boolean isDirectory() {
+        return false;
+    }
+
+    @Override
+    public Iterable<? extends LocalSourceFile> getChildren(LocalFileFilter filter) throws IOException {
+        return Collections.emptyList();
+    }
+
+    @Override
+    public boolean providesAtimeMtime() {
+        return false;
+    }
+
+    @Override
+    public long getLastAccessTime() throws IOException {
+        return 0;
+    }
+
+    @Override
+    public long getLastModifiedTime() throws IOException {
+        return 0;
+    }
+}
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SFTPTransfer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SFTPTransfer.java
index 34bc0e3..5180582 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SFTPTransfer.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SFTPTransfer.java
@@ -16,10 +16,50 @@
  */
 package org.apache.nifi.processors.standard.util;
 
+import net.schmizz.keepalive.KeepAlive;
+import net.schmizz.keepalive.KeepAliveProvider;
+import net.schmizz.sshj.DefaultConfig;
+import net.schmizz.sshj.SSHClient;
+import net.schmizz.sshj.connection.ConnectionException;
+import net.schmizz.sshj.connection.ConnectionImpl;
+import net.schmizz.sshj.sftp.FileAttributes;
+import net.schmizz.sshj.sftp.FileMode;
+import net.schmizz.sshj.sftp.RemoteFile;
+import net.schmizz.sshj.sftp.RemoteResourceFilter;
+import net.schmizz.sshj.sftp.RemoteResourceInfo;
+import net.schmizz.sshj.sftp.Response;
+import net.schmizz.sshj.sftp.SFTPClient;
+import net.schmizz.sshj.sftp.SFTPException;
+import net.schmizz.sshj.transport.TransportException;
+import net.schmizz.sshj.transport.verification.PromiscuousVerifier;
+import net.schmizz.sshj.userauth.keyprovider.KeyProvider;
+import net.schmizz.sshj.userauth.method.AuthMethod;
+import net.schmizz.sshj.userauth.method.AuthPassword;
+import net.schmizz.sshj.userauth.method.AuthPublickey;
+import net.schmizz.sshj.userauth.password.PasswordFinder;
+import net.schmizz.sshj.userauth.password.PasswordUtils;
+import net.schmizz.sshj.xfer.FilePermission;
+import net.schmizz.sshj.xfer.LocalSourceFile;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.context.PropertyContext;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.proxy.ProxyConfiguration;
+import org.apache.nifi.proxy.ProxySpec;
+
+import javax.net.SocketFactory;
 import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InputStream;
+import java.net.InetAddress;
+import java.net.Proxy;
+import java.net.Socket;
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.text.DateFormat;
@@ -29,34 +69,10 @@ import java.util.Collection;
 import java.util.Date;
 import java.util.List;
 import java.util.Locale;
-import java.util.Properties;
-import java.util.Vector;
+import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.regex.Pattern;
 
-import com.jcraft.jsch.ProxySOCKS5;
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.components.PropertyValue;
-import org.apache.nifi.components.ValidationContext;
-import org.apache.nifi.components.ValidationResult;
-import org.apache.nifi.expression.ExpressionLanguageScope;
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.logging.ComponentLog;
-import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.processor.util.StandardValidators;
-import org.apache.nifi.proxy.ProxyConfiguration;
-import org.apache.nifi.proxy.ProxySpec;
-import org.slf4j.LoggerFactory;
-
-import com.jcraft.jsch.ChannelSftp;
-import com.jcraft.jsch.ChannelSftp.LsEntry;
-import com.jcraft.jsch.ChannelSftp.LsEntrySelector;
-import com.jcraft.jsch.JSch;
-import com.jcraft.jsch.JSchException;
-import com.jcraft.jsch.ProxyHTTP;
-import com.jcraft.jsch.Session;
-import com.jcraft.jsch.SftpException;
-
 import static org.apache.nifi.processors.standard.util.FTPTransfer.createComponentProxyConfigSupplier;
 
 public class SFTPTransfer implements FileTransfer {
@@ -107,8 +123,8 @@ public class SFTPTransfer implements FileTransfer {
 
 
     /**
-     * Property which is used to decide if the {@link #ensureDirectoryExists(FlowFile, File)} method should perform a {@link ChannelSftp#ls(String)} before calling
-     * {@link ChannelSftp#mkdir(String)}. In most cases, the code should call ls before mkdir, but some weird permission setups (chmod 100) on a directory would cause the 'ls' to throw a permission
+     * Property which is used to decide if the {@link #ensureDirectoryExists(FlowFile, File)} method should perform a {@link SFTPClient#ls(String)} before calling
+     * {@link SFTPClient#mkdir(String)}. In most cases, the code should call ls before mkdir, but some weird permission setups (chmod 100) on a directory would cause the 'ls' to throw a permission
      * exception.
      */
     public static final PropertyDescriptor DISABLE_DIRECTORY_LISTING = new PropertyDescriptor.Builder()
@@ -132,19 +148,21 @@ public class SFTPTransfer implements FileTransfer {
 
     private final ComponentLog logger;
 
-    private final ProcessContext ctx;
-    private Session session;
-    private ChannelSftp sftp;
-    private boolean closed = false;
+    private final PropertyContext ctx;
+
+    private SSHClient sshClient;
+    private SFTPClient sftpClient;
+
+    private volatile boolean closed = false;
     private String homeDir;
 
     private final boolean disableDirectoryListing;
 
-    public SFTPTransfer(final ProcessContext processContext, final ComponentLog logger) {
-        this.ctx = processContext;
+    public SFTPTransfer(final PropertyContext propertyContext, final ComponentLog logger) {
+        this.ctx = propertyContext;
         this.logger = logger;
 
-        final PropertyValue disableListing = processContext.getProperty(DISABLE_DIRECTORY_LISTING);
+        final PropertyValue disableListing = propertyContext.getProperty(DISABLE_DIRECTORY_LISTING);
         disableDirectoryListing = disableListing == null ? false : Boolean.TRUE.equals(disableListing.asBoolean());
     }
 
@@ -209,72 +227,67 @@ public class SFTPTransfer implements FileTransfer {
             }
         }
 
-        final ChannelSftp sftp = getChannel(null);
+        final SFTPClient sftpClient = getSFTPClient(null);
         final boolean isPathMatch = pathFilterMatches;
 
         //subDirs list is used for both 'sub directories' and 'symlinks'
-        final List<LsEntry> subDirs = new ArrayList<>();
+        final List<RemoteResourceInfo> subDirs = new ArrayList<>();
         try {
-            final LsEntrySelector filter = new LsEntrySelector() {
-                @Override
-                public int select(final LsEntry entry) {
-                    final String entryFilename = entry.getFilename();
+            final RemoteResourceFilter filter = (entry) -> {
+                final String entryFilename = entry.getName();
 
-                    // skip over 'this directory' and 'parent directory' special
-                    // files regardless of ignoring dot files
-                    if (entryFilename.equals(".") || entryFilename.equals("..")) {
-                        return LsEntrySelector.CONTINUE;
-                    }
+                // skip over 'this directory' and 'parent directory' special files regardless of ignoring dot files
+                if (entryFilename.equals(".") || entryFilename.equals("..")) {
+                    return false;
+                }
 
-                    // skip files and directories that begin with a dot if we're
-                    // ignoring them
-                    if (ignoreDottedFiles && entryFilename.startsWith(".")) {
-                        return LsEntrySelector.CONTINUE;
-                    }
+                // skip files and directories that begin with a dot if we're ignoring them
+                if (ignoreDottedFiles && entryFilename.startsWith(".")) {
+                    return false;
+                }
 
-                    // if is a directory and we're supposed to recurse
-                    // OR if is a link and we're supposed to follow symlink
-                    if ((recurse && entry.getAttrs().isDir()) || (symlink && entry.getAttrs().isLink())){
-                        subDirs.add(entry);
-                        return LsEntrySelector.CONTINUE;
-                    }
+                // if is a directory and we're supposed to recurse OR if is a link and we're supposed to follow symlink
+                if ((recurse && entry.isDirectory()) || (symlink && (entry.getAttributes().getType() == FileMode.Type.SYMLINK))){
+                    subDirs.add(entry);
+                    return false;
+                }
 
-                    // if is not a directory and is not a link and it matches
-                    // FILE_FILTER_REGEX - then let's add it
-                    if (!entry.getAttrs().isDir() && !entry.getAttrs().isLink() && isPathMatch) {
-                        if (pattern == null || pattern.matcher(entryFilename).matches()) {
-                            listing.add(newFileInfo(entry, path));
-                        }
-                    }
+                // Since SSHJ does not have the concept of BREAK that JSCH had, we need to move this before the call to listing.add
+                // below, otherwise we would keep adding to the listings since returning false here doesn't break
+                if (listing.size() >= maxResults) {
+                    return false;
+                }
 
-                    if (listing.size() >= maxResults) {
-                        return LsEntrySelector.BREAK;
+                // if is not a directory and is not a link and it matches FILE_FILTER_REGEX - then let's add it
+                if (!entry.isDirectory() && !(entry.getAttributes().getType() == FileMode.Type.SYMLINK) && isPathMatch) {
+                    if (pattern == null || pattern.matcher(entryFilename).matches()) {
+                        listing.add(newFileInfo(entry, path));
                     }
-
-                    return LsEntrySelector.CONTINUE;
                 }
 
+                return false;
             };
 
             if (path == null || path.trim().isEmpty()) {
-                sftp.ls(".", filter);
+                sftpClient.ls(".", filter);
             } else {
-                sftp.ls(path, filter);
+                sftpClient.ls(path, filter);
             }
-        } catch (final SftpException e) {
+        } catch (final SFTPException e) {
             final String pathDesc = path == null ? "current directory" : path;
-            switch (e.id) {
-                case ChannelSftp.SSH_FX_NO_SUCH_FILE:
+            switch (e.getStatusCode()) {
+                case NO_SUCH_FILE:
                     throw new FileNotFoundException("Could not perform listing on " + pathDesc + " because could not find the file on the remote server");
-                case ChannelSftp.SSH_FX_PERMISSION_DENIED:
+                case PERMISSION_DENIED:
                     throw new PermissionDeniedException("Could not perform listing on " + pathDesc + " due to insufficient permissions");
                 default:
-                    throw new IOException(String.format("Failed to obtain file listing for %s due to unexpected SSH_FXP_STATUS (%d)", pathDesc, e.id), e);
+                    throw new IOException(String.format("Failed to obtain file listing for %s due to unexpected SSH_FXP_STATUS (%d)",
+                            pathDesc, e.getStatusCode().getCode()), e);
             }
         }
 
-        for (final LsEntry entry : subDirs) {
-            final String entryFilename = entry.getFilename();
+        for (final RemoteResourceInfo entry : subDirs) {
+            final String entryFilename = entry.getName();
             final File newFullPath = new File(path, entryFilename);
             final String newFullForwardPath = newFullPath.getPath().replace("\\", "/");
 
@@ -287,30 +300,48 @@ public class SFTPTransfer implements FileTransfer {
 
     }
 
-    private FileInfo newFileInfo(final LsEntry entry, String path) {
+    private FileInfo newFileInfo(final RemoteResourceInfo entry, String path) {
         if (entry == null) {
             return null;
         }
-        final File newFullPath = new File(path, entry.getFilename());
+        final File newFullPath = new File(path, entry.getName());
         final String newFullForwardPath = newFullPath.getPath().replace("\\", "/");
 
-        String perms = entry.getAttrs().getPermissionsString();
-        if (perms.length() > 9) {
-            perms = perms.substring(perms.length() - 9);
-        }
+        final StringBuilder permsBuilder = new StringBuilder();
+        final Set<FilePermission> permissions = entry.getAttributes().getPermissions();
+
+        appendPermission(permsBuilder, permissions, FilePermission.USR_R, "r");
+        appendPermission(permsBuilder, permissions, FilePermission.USR_W, "w");
+        appendPermission(permsBuilder, permissions, FilePermission.USR_X, "x");
+
+        appendPermission(permsBuilder, permissions, FilePermission.GRP_R, "r");
+        appendPermission(permsBuilder, permissions, FilePermission.GRP_W, "w");
+        appendPermission(permsBuilder, permissions, FilePermission.GRP_X, "x");
 
-        FileInfo.Builder builder = new FileInfo.Builder()
-            .filename(entry.getFilename())
+        appendPermission(permsBuilder, permissions, FilePermission.OTH_R, "r");
+        appendPermission(permsBuilder, permissions, FilePermission.OTH_W, "w");
+        appendPermission(permsBuilder, permissions, FilePermission.OTH_X, "x");
+
+        final FileInfo.Builder builder = new FileInfo.Builder()
+            .filename(entry.getName())
             .fullPathFileName(newFullForwardPath)
-            .directory(entry.getAttrs().isDir())
-            .size(entry.getAttrs().getSize())
-            .lastModifiedTime(entry.getAttrs().getMTime() * 1000L)
-            .permissions(perms)
-            .owner(Integer.toString(entry.getAttrs().getUId()))
-            .group(Integer.toString(entry.getAttrs().getGId()));
+            .directory(entry.isDirectory())
+            .size(entry.getAttributes().getSize())
+            .lastModifiedTime(entry.getAttributes().getMtime() * 1000L)
+            .permissions(permsBuilder.toString())
+            .owner(Integer.toString(entry.getAttributes().getUID()))
+            .group(Integer.toString(entry.getAttributes().getGID()));
         return builder.build();
     }
 
+    private void appendPermission(final StringBuilder builder, final Set<FilePermission> permissions, final FilePermission filePermission, final String permString) {
+        if (permissions.contains(filePermission)) {
+            builder.append(permString);
+        } else {
+            builder.append("-");
+        }
+    }
+
     @Override
     public InputStream getInputStream(final String remoteFileName) throws IOException {
         return getInputStream(remoteFileName, null);
@@ -318,14 +349,18 @@ public class SFTPTransfer implements FileTransfer {
 
     @Override
     public InputStream getInputStream(final String remoteFileName, final FlowFile flowFile) throws IOException {
-        final ChannelSftp sftp = getChannel(flowFile);
+        final SFTPClient sftpClient = getSFTPClient(flowFile);
         try {
-            return sftp.get(remoteFileName);
-        } catch (final SftpException e) {
-            switch (e.id) {
-                case ChannelSftp.SSH_FX_NO_SUCH_FILE:
+            // The client has 'get' methods for downloading a file, but they don't offer a way to get access to an InputStream so
+            // this code is what the SFTPTransfer Downloader does to get a stream for the remote file contents
+            final RemoteFile rf = sftpClient.open(remoteFileName);
+            final RemoteFile.ReadAheadRemoteFileInputStream rfis = rf.new ReadAheadRemoteFileInputStream(16);
+            return rfis;
+        } catch (final SFTPException e) {
+            switch (e.getStatusCode()) {
+                case NO_SUCH_FILE:
                     throw new FileNotFoundException("Could not find file " + remoteFileName + " on remote SFTP Server");
-                case ChannelSftp.SSH_FX_PERMISSION_DENIED:
+                case PERMISSION_DENIED:
                     throw new PermissionDeniedException("Insufficient permissions to read file " + remoteFileName + " from remote SFTP Server", e);
                 default:
                     throw new IOException("Failed to obtain file content for " + remoteFileName, e);
@@ -345,14 +380,15 @@ public class SFTPTransfer implements FileTransfer {
 
     @Override
     public void deleteFile(final FlowFile flowFile, final String path, final String remoteFileName) throws IOException {
+        final SFTPClient sftpClient = getSFTPClient(flowFile);
         final String fullPath = (path == null) ? remoteFileName : (path.endsWith("/")) ? path + remoteFileName : path + "/" + remoteFileName;
         try {
-            sftp.rm(fullPath);
-        } catch (final SftpException e) {
-            switch (e.id) {
-                case ChannelSftp.SSH_FX_NO_SUCH_FILE:
+            sftpClient.rm(fullPath);
+        } catch (final SFTPException e) {
+            switch (e.getStatusCode()) {
+                case NO_SUCH_FILE:
                     throw new FileNotFoundException("Could not find file " + remoteFileName + " to remove from remote SFTP Server");
-                case ChannelSftp.SSH_FX_PERMISSION_DENIED:
+                case PERMISSION_DENIED:
                     throw new PermissionDeniedException("Insufficient permissions to delete file " + remoteFileName + " from remote SFTP Server", e);
                 default:
                     throw new IOException("Failed to delete remote file " + fullPath, e);
@@ -362,16 +398,17 @@ public class SFTPTransfer implements FileTransfer {
 
     @Override
     public void deleteDirectory(final FlowFile flowFile, final String remoteDirectoryName) throws IOException {
+        final SFTPClient sftpClient = getSFTPClient(flowFile);
         try {
-            sftp.rm(remoteDirectoryName);
-        } catch (final SftpException e) {
+            sftpClient.rmdir(remoteDirectoryName);
+        } catch (final SFTPException e) {
             throw new IOException("Failed to delete remote directory " + remoteDirectoryName, e);
         }
     }
 
     @Override
     public void ensureDirectoryExists(final FlowFile flowFile, final File directoryName) throws IOException {
-        final ChannelSftp channel = getChannel(flowFile);
+        final SFTPClient sftpClient = getSFTPClient(flowFile);
         final String remoteDirectory = directoryName.getAbsolutePath().replace("\\", "/").replaceAll("^.\\:", "");
 
         // if we disable the directory listing, we just want to blindly perform the mkdir command,
@@ -379,16 +416,16 @@ public class SFTPTransfer implements FileTransfer {
         if (disableDirectoryListing) {
             try {
                 // Blindly create the dir.
-                channel.mkdir(remoteDirectory);
+                sftpClient.mkdir(remoteDirectory);
                 // The remote directory did not exist, and was created successfully.
                 return;
-            } catch (SftpException e) {
-                if (e.id == ChannelSftp.SSH_FX_NO_SUCH_FILE) {
+            } catch (SFTPException e) {
+                if (e.getStatusCode() == Response.StatusCode.NO_SUCH_FILE) {
                     // No Such File. This happens when parent directory was not found.
                     logger.debug(String.format("Could not create %s due to 'No such file'. Will try to create the parent dir.", remoteDirectory));
-                } else if (e.id == ChannelSftp.SSH_FX_FAILURE) {
+                } else if (e.getStatusCode() == Response.StatusCode.FAILURE) {
                     // Swallow '4: Failure' including the remote directory already exists.
-                    logger.debug("Could not blindly create remote directory due to " + e.getMessage(), e);
+                    logger.debug("Could not blindly create remote directory due to " + getMessage(e), e);
                     return;
                 } else {
                     throw new IOException("Could not blindly create remote directory due to " + e.getMessage(), e);
@@ -397,12 +434,12 @@ public class SFTPTransfer implements FileTransfer {
         } else {
             try {
                 // Check dir existence.
-                channel.stat(remoteDirectory);
+                sftpClient.stat(remoteDirectory);
                 // The remote directory already exists.
                 return;
-            } catch (final SftpException e) {
-                if (e.id != ChannelSftp.SSH_FX_NO_SUCH_FILE) {
-                    throw new IOException("Failed to determine if remote directory exists at " + remoteDirectory + " due to " + e, e);
+            } catch (final SFTPException e) {
+                if (e.getStatusCode() != Response.StatusCode.NO_SUCH_FILE) {
+                    throw new IOException("Failed to determine if remote directory exists at " + remoteDirectory + " due to " + getMessage(e), e);
                 }
             }
         }
@@ -413,112 +450,181 @@ public class SFTPTransfer implements FileTransfer {
         }
         logger.debug("Remote Directory {} does not exist; creating it", new Object[] {remoteDirectory});
         try {
-            channel.mkdir(remoteDirectory);
+            sftpClient.mkdir(remoteDirectory);
             logger.debug("Created {}", new Object[] {remoteDirectory});
-        } catch (final SftpException e) {
-            throw new IOException("Failed to create remote directory " + remoteDirectory + " due to " + e, e);
+        } catch (final SFTPException e) {
+            throw new IOException("Failed to create remote directory " + remoteDirectory + " due to " + getMessage(e), e);
+        }
+    }
+
+    private String getMessage(final SFTPException e) {
+        if (e.getStatusCode() != null) {
+            return e.getStatusCode().getCode() + ": " + e.getMessage();
+        } else {
+            return e.getMessage();
         }
     }
 
-    protected ChannelSftp getChannel(final FlowFile flowFile) throws IOException {
-        if (sftp != null) {
-            String sessionhost = session.getHost();
-            String desthost = ctx.getProperty(HOSTNAME).evaluateAttributeExpressions(flowFile).getValue();
-            if (sessionhost.equals(desthost)) {
+    private static KeepAliveProvider NO_OP_KEEP_ALIVE = new KeepAliveProvider() {
+        @Override
+        public KeepAlive provide(final ConnectionImpl connection) {
+            return new KeepAlive(connection, "no-op-keep-alive") {
+                @Override
+                protected void doKeepAlive() throws TransportException, ConnectionException {
+                    // do nothing;
+                }
+            };
+        }
+    };
+
+    protected SFTPClient getSFTPClient(final FlowFile flowFile) throws IOException {
+        // If the client is already initialized then compare the host that the client is connected to with the current
+        // host from the properties/flow-file, and if different then we need to close and reinitialize, if same we can reuse
+        if (sftpClient != null) {
+            final String clientHost = sshClient.getRemoteHostname();
+            final String propertiesHost = ctx.getProperty(HOSTNAME).evaluateAttributeExpressions(flowFile).getValue();
+            if (clientHost.equals(propertiesHost)) {
                 // destination matches so we can keep our current session
-                return sftp;
+                return sftpClient;
             } else {
                 // this flowFile is going to a different destination, reset session
                 close();
             }
         }
 
-        final JSch jsch = new JSch();
-        try {
-            final String username = ctx.getProperty(USERNAME).evaluateAttributeExpressions(flowFile).getValue();
-            final Session session = jsch.getSession(username,
-                ctx.getProperty(HOSTNAME).evaluateAttributeExpressions(flowFile).getValue(),
-                ctx.getProperty(PORT).evaluateAttributeExpressions(flowFile).asInteger().intValue());
-
-            final ProxyConfiguration proxyConfig = ProxyConfiguration.getConfiguration(ctx, createComponentProxyConfigSupplier(ctx));
-            switch (proxyConfig.getProxyType()) {
-                case HTTP:
-                    final ProxyHTTP proxyHTTP = new ProxyHTTP(proxyConfig.getProxyServerHost(), proxyConfig.getProxyServerPort());
-                    // Check if Username is set and populate the proxy accordingly
-                    if (proxyConfig.hasCredential()) {
-                        proxyHTTP.setUserPasswd(proxyConfig.getProxyUserName(), proxyConfig.getProxyUserPassword());
-                    }
-                    session.setProxy(proxyHTTP);
-                    break;
-                case SOCKS:
-                    final ProxySOCKS5 proxySOCKS5 = new ProxySOCKS5(proxyConfig.getProxyServerHost(), proxyConfig.getProxyServerPort());
-                    if (proxyConfig.hasCredential()) {
-                        proxySOCKS5.setUserPasswd(proxyConfig.getProxyUserName(), proxyConfig.getProxyUserPassword());
-                    }
-                    session.setProxy(proxySOCKS5);
-                    break;
+        // Initialize a new SSHClient...
 
-            }
+        // If use keep-alive is set then set the provider which sends max of 5 keep-alives, otherwise set the no-op provider
+        final DefaultConfig sshClientConfig = new DefaultConfig();
+        final boolean useKeepAliveOnTimeout = ctx.getProperty(USE_KEEPALIVE_ON_TIMEOUT).asBoolean();
+        if (useKeepAliveOnTimeout) {
+            sshClientConfig.setKeepAliveProvider(KeepAliveProvider.KEEP_ALIVE);
+        } else {
+            sshClientConfig.setKeepAliveProvider(NO_OP_KEEP_ALIVE);
+        }
 
-            final String hostKeyVal = ctx.getProperty(HOST_KEY_FILE).getValue();
-            if (hostKeyVal != null) {
-                jsch.setKnownHosts(hostKeyVal);
-            }
+        final SSHClient sshClient = new SSHClient(sshClientConfig);
+
+        // Create a Proxy if the config was specified, proxy will be null if type was NO_PROXY
+        final Proxy proxy;
+        final ProxyConfiguration proxyConfig = ProxyConfiguration.getConfiguration(ctx, createComponentProxyConfigSupplier(ctx));
+        switch (proxyConfig.getProxyType()) {
+            case HTTP:
+            case SOCKS:
+                proxy = proxyConfig.createProxy();
+                break;
+            default:
+                proxy = null;
+                break;
+        }
 
-            final Properties properties = new Properties();
-            properties.setProperty("StrictHostKeyChecking", ctx.getProperty(STRICT_HOST_KEY_CHECKING).asBoolean() ? "yes" : "no");
-            properties.setProperty("PreferredAuthentications", "publickey,password,keyboard-interactive");
+        // If a proxy was specified, configure the client to use a SocketFactory that creates Sockets using the proxy
+        if (proxy != null) {
+            sshClient.setSocketFactory(new SocketFactory() {
+                @Override
+                public Socket createSocket() {
+                    return new Socket(proxy);
+                }
 
-            final PropertyValue compressionValue = ctx.getProperty(FileTransfer.USE_COMPRESSION);
-            if (compressionValue != null && "true".equalsIgnoreCase(compressionValue.getValue())) {
-                properties.setProperty("compression.s2c", "zlib@openssh.com,zlib,none");
-                properties.setProperty("compression.c2s", "zlib@openssh.com,zlib,none");
-            } else {
-                properties.setProperty("compression.s2c", "none");
-                properties.setProperty("compression.c2s", "none");
-            }
+                @Override
+                public Socket createSocket(String s, int i) {
+                    return new Socket(proxy);
+                }
 
-            session.setConfig(properties);
+                @Override
+                public Socket createSocket(String s, int i, InetAddress inetAddress, int i1) {
+                    return new Socket(proxy);
+                }
 
-            final String privateKeyFile = ctx.getProperty(PRIVATE_KEY_PATH).evaluateAttributeExpressions(flowFile).getValue();
-            if (privateKeyFile != null) {
-                jsch.addIdentity(privateKeyFile, ctx.getProperty(PRIVATE_KEY_PASSPHRASE).evaluateAttributeExpressions(flowFile).getValue());
-            }
+                @Override
+                public Socket createSocket(InetAddress inetAddress, int i) {
+                    return new Socket(proxy);
+                }
 
-            final String password = ctx.getProperty(FileTransfer.PASSWORD).evaluateAttributeExpressions(flowFile).getValue();
-            if (password != null) {
-                session.setPassword(password);
-            }
+                @Override
+                public Socket createSocket(InetAddress inetAddress, int i, InetAddress inetAddress1, int i1) {
+                    return new Socket(proxy);
+                }
+            });
+        }
 
-            final int connectionTimeoutMillis = ctx.getProperty(FileTransfer.CONNECTION_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
-            session.setTimeout(connectionTimeoutMillis);
-            session.connect();
-            this.session = session;
-            this.closed = false;
-
-            sftp = (ChannelSftp) session.openChannel("sftp");
-            sftp.connect(connectionTimeoutMillis);
-            session.setTimeout(ctx.getProperty(FileTransfer.DATA_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue());
-            if (!ctx.getProperty(USE_KEEPALIVE_ON_TIMEOUT).asBoolean()) {
-                session.setServerAliveCountMax(0); // do not send keepalive message on SocketTimeoutException
-            }
-            try {
-                this.homeDir = sftp.getHome();
-            } catch (SftpException e) {
-                // For some combination of server configuration and user home directory, getHome() can fail with "2: File not found"
-                // Since  homeDir is only used tor SEND provenance event transit uri, this is harmless. Log and continue.
-                logger.debug("Failed to retrieve {} home directory due to {}", new Object[]{username, e.getMessage()});
-            }
-            return sftp;
+        // Load known hosts file if specified, otherwise load default
+        final String hostKeyVal = ctx.getProperty(HOST_KEY_FILE).getValue();
+        if (hostKeyVal != null) {
+            sshClient.loadKnownHosts(new File(hostKeyVal));
+        } else {
+            sshClient.loadKnownHosts();
+        }
+
+        // If strict host key checking is false, add a HostKeyVerifier that always returns true
+        final boolean strictHostKeyChecking = ctx.getProperty(STRICT_HOST_KEY_CHECKING).asBoolean();
+        if (!strictHostKeyChecking) {
+            sshClient.addHostKeyVerifier(new PromiscuousVerifier());
+        }
+
+        // Enable compression on the client if specified in properties
+        final PropertyValue compressionValue = ctx.getProperty(FileTransfer.USE_COMPRESSION);
+        if (compressionValue != null && "true".equalsIgnoreCase(compressionValue.getValue())) {
+            sshClient.useCompression();
+        }
+
+        // Configure connection timeout
+        final int connectionTimeoutMillis = ctx.getProperty(FileTransfer.CONNECTION_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
+        sshClient.setTimeout(connectionTimeoutMillis);
+
+        // Connect to the host and port
+        final String hostname = ctx.getProperty(HOSTNAME).evaluateAttributeExpressions(flowFile).getValue();
+        final int port = ctx.getProperty(PORT).evaluateAttributeExpressions(flowFile).asInteger().intValue();
+        sshClient.connect(hostname, port);
+
+        // Setup authentication methods...
+        final List<AuthMethod> authMethods = new ArrayList<>();
+
+        // Add public-key auth if a private key is specified
+        final String privateKeyFile = ctx.getProperty(PRIVATE_KEY_PATH).evaluateAttributeExpressions(flowFile).getValue();
+        if (privateKeyFile != null) {
+            final String privateKeyPassphrase = ctx.getProperty(PRIVATE_KEY_PASSPHRASE).evaluateAttributeExpressions(flowFile).getValue();
+            final KeyProvider keyProvider = privateKeyPassphrase == null ? sshClient.loadKeys(privateKeyFile) : sshClient.loadKeys(privateKeyFile, privateKeyPassphrase);
+            final AuthMethod publicKeyAuth = new AuthPublickey(keyProvider);
+            authMethods.add(publicKeyAuth);
+        }
+
+        // Add password auth if a password is specified
+        final String password = ctx.getProperty(FileTransfer.PASSWORD).evaluateAttributeExpressions(flowFile).getValue();
+        if (password != null) {
+            final PasswordFinder passwordFinder = PasswordUtils.createOneOff(password.toCharArray());
+            final AuthMethod passwordAuth = new AuthPassword(passwordFinder);
+            authMethods.add(passwordAuth);
+        }
 
-        } catch (JSchException e) {
-            throw new IOException("Failed to obtain connection to remote host due to " + e.toString(), e);
+        // Authenticate...
+        final String username = ctx.getProperty(USERNAME).evaluateAttributeExpressions(flowFile).getValue();
+        sshClient.auth(username, authMethods);
+
+        // At this point we are connected and can create a new SFTPClient which means everything is good
+        this.sshClient = sshClient;
+        this.sftpClient = sshClient.newSFTPClient();
+        this.closed = false;
+
+        // Configure timeout for sftp operations
+        final int dataTimeout = ctx.getProperty(FileTransfer.DATA_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
+        this.sftpClient.getSFTPEngine().setTimeoutMs(dataTimeout);
+
+        // Attempt to get the home dir
+        try {
+            this.homeDir = sftpClient.canonicalize("");
+        } catch (IOException e) {
+            // For some combination of server configuration and user home directory, getHome() can fail with "2: File not found"
+            // Since  homeDir is only used tor SEND provenance event transit uri, this is harmless. Log and continue.
+            logger.debug("Failed to retrieve {} home directory due to {}", new Object[]{username, e.getMessage()});
         }
+
+        return sftpClient;
     }
 
     @Override
     public String getHomeDirectory(final FlowFile flowFile) throws IOException {
-        getChannel(flowFile);
+        getSFTPClient(flowFile);
         return this.homeDir;
     }
 
@@ -530,22 +636,22 @@ public class SFTPTransfer implements FileTransfer {
         closed = true;
 
         try {
-            if (null != sftp) {
-                sftp.exit();
+            if (null != sftpClient) {
+                sftpClient.close();
             }
         } catch (final Exception ex) {
-            logger.warn("Failed to close ChannelSftp due to {}", new Object[] {ex.toString()}, ex);
+            logger.warn("Failed to close SFTPClient due to {}", new Object[] {ex.toString()}, ex);
         }
-        sftp = null;
+        sftpClient = null;
 
         try {
-            if (null != session) {
-                session.disconnect();
+            if (null != sshClient) {
+                sshClient.disconnect();
             }
         } catch (final Exception ex) {
-            logger.warn("Failed to close session due to {}", new Object[] {ex.toString()}, ex);
+            logger.warn("Failed to close SSHClient due to {}", new Object[] {ex.toString()}, ex);
         }
-        session = null;
+        sshClient = null;
     }
 
     @Override
@@ -556,45 +662,39 @@ public class SFTPTransfer implements FileTransfer {
     @Override
     @SuppressWarnings("unchecked")
     public FileInfo getRemoteFileInfo(final FlowFile flowFile, final String path, String filename) throws IOException {
-        final ChannelSftp sftp = getChannel(flowFile);
-        final String fullPath;
-
-        if (path == null) {
-            fullPath = filename;
-            int slashpos = filename.lastIndexOf('/');
-            if (slashpos >= 0 && !filename.endsWith("/")) {
-                filename = filename.substring(slashpos + 1);
-            }
-        } else {
-            fullPath = path + "/" + filename;
-        }
+        final SFTPClient sftpClient = getSFTPClient(flowFile);
 
-        final Vector<LsEntry> vector;
+        final List<RemoteResourceInfo> remoteResources;
         try {
-            vector = sftp.ls(fullPath);
-        } catch (final SftpException e) {
-            // ls throws exception if filename is not present
-            if (e.id == ChannelSftp.SSH_FX_NO_SUCH_FILE) {
+            remoteResources = sftpClient.ls(path);
+        } catch (final SFTPException e) {
+            if (e.getStatusCode() == Response.StatusCode.NO_SUCH_FILE) {
                 return null;
             } else {
-                throw new IOException("Failed to obtain file listing for " + fullPath, e);
+                throw new IOException("Failed to obtain file listing for " + path, e);
             }
         }
 
-        LsEntry matchingEntry = null;
-        for (final LsEntry entry : vector) {
-            if (entry.getFilename().equalsIgnoreCase(filename)) {
+        RemoteResourceInfo matchingEntry = null;
+        for (final RemoteResourceInfo entry : remoteResources) {
+            if (entry.getName().equalsIgnoreCase(filename)) {
                 matchingEntry = entry;
                 break;
             }
         }
 
-        return newFileInfo(matchingEntry, path);
+        // Previously JSCH would perform a listing on the full path (path + filename) and would get an exception when it wasn't
+        // a file and then return null, so to preserve that behavior we return null if the matchingEntry is a directory
+        if (matchingEntry != null && matchingEntry.isDirectory()) {
+            return null;
+        } else {
+            return newFileInfo(matchingEntry, path);
+        }
     }
 
     @Override
     public String put(final FlowFile flowFile, final String path, final String filename, final InputStream content) throws IOException {
-        final ChannelSftp sftp = getChannel(flowFile);
+        final SFTPClient sftpClient = getSFTPClient(flowFile);
 
         // destination path + filename
         final String fullPath = (path == null) ? filename : (path.endsWith("/")) ? path + filename : path + "/" + filename;
@@ -607,10 +707,17 @@ public class SFTPTransfer implements FileTransfer {
         }
         final String tempPath = (path == null) ? tempFilename : (path.endsWith("/")) ? path + tempFilename : path + "/" + tempFilename;
 
+        int perms = 0;
+        final String permissions = ctx.getProperty(PERMISSIONS).evaluateAttributeExpressions(flowFile).getValue();
+        if (permissions != null && !permissions.trim().isEmpty()) {
+            perms = numberPermissions(permissions);
+        }
+
         try {
-            sftp.put(content, tempPath);
-        } catch (final SftpException e) {
-            throw new IOException("Unable to put content to " + fullPath + " due to " + e, e);
+            final LocalSourceFile sourceFile = new SFTPFlowFileSourceFile(filename, content, perms);
+            sftpClient.put(sourceFile, tempPath);
+        } catch (final SFTPException e) {
+            throw new IOException("Unable to put content to " + fullPath + " due to " + getMessage(e), e);
         }
 
         final String lastModifiedTime = ctx.getProperty(LAST_MODIFIED_TIME).evaluateAttributeExpressions(flowFile).getValue();
@@ -619,28 +726,23 @@ public class SFTPTransfer implements FileTransfer {
                 final DateFormat formatter = new SimpleDateFormat(FILE_MODIFY_DATE_ATTR_FORMAT, Locale.US);
                 final Date fileModifyTime = formatter.parse(lastModifiedTime);
                 int time = (int) (fileModifyTime.getTime() / 1000L);
-                sftp.setMtime(tempPath, time);
-            } catch (final Exception e) {
-                logger.error("Failed to set lastModifiedTime on {} to {} due to {}", new Object[] {tempPath, lastModifiedTime, e});
-            }
-        }
 
-        final String permissions = ctx.getProperty(PERMISSIONS).evaluateAttributeExpressions(flowFile).getValue();
-        if (permissions != null && !permissions.trim().isEmpty()) {
-            try {
-                int perms = numberPermissions(permissions);
-                if (perms >= 0) {
-                    sftp.chmod(perms, tempPath);
-                }
+                final FileAttributes tempAttributes = sftpClient.stat(tempPath);
+
+                final FileAttributes modifiedAttributes = new FileAttributes.Builder()
+                        .withAtimeMtime(tempAttributes.getAtime(), time)
+                        .build();
+
+                sftpClient.setattr(tempPath, modifiedAttributes);
             } catch (final Exception e) {
-                logger.error("Failed to set permission on {} to {} due to {}", new Object[] {tempPath, permissions, e});
+                logger.error("Failed to set lastModifiedTime on {} to {} due to {}", new Object[] {tempPath, lastModifiedTime, e});
             }
         }
 
         final String owner = ctx.getProperty(REMOTE_OWNER).evaluateAttributeExpressions(flowFile).getValue();
         if (owner != null && !owner.trim().isEmpty()) {
             try {
-                sftp.chown(Integer.parseInt(owner), tempPath);
+                sftpClient.chown(tempPath, Integer.parseInt(owner));
             } catch (final Exception e) {
                 logger.error("Failed to set owner on {} to {} due to {}", new Object[] {tempPath, owner, e});
             }
@@ -649,7 +751,7 @@ public class SFTPTransfer implements FileTransfer {
         final String group = ctx.getProperty(REMOTE_GROUP).evaluateAttributeExpressions(flowFile).getValue();
         if (group != null && !group.trim().isEmpty()) {
             try {
-                sftp.chgrp(Integer.parseInt(group), tempPath);
+                sftpClient.chgrp(tempPath, Integer.parseInt(group));
             } catch (final Exception e) {
                 logger.error("Failed to set group on {} to {} due to {}", new Object[] {tempPath, group, e});
             }
@@ -657,12 +759,12 @@ public class SFTPTransfer implements FileTransfer {
 
         if (!filename.equals(tempFilename)) {
             try {
-                sftp.rename(tempPath, fullPath);
-            } catch (final SftpException e) {
+                sftpClient.rename(tempPath, fullPath);
+            } catch (final SFTPException e) {
                 try {
-                    sftp.rm(tempPath);
-                    throw new IOException("Failed to rename dot-file to " + fullPath + " due to " + e, e);
-                } catch (final SftpException e1) {
+                    sftpClient.rm(tempPath);
+                    throw new IOException("Failed to rename dot-file to " + fullPath + " due to " + getMessage(e), e);
+                } catch (final SFTPException e1) {
                     throw new IOException("Failed to rename dot-file to " + fullPath + " and failed to delete it when attempting to clean up", e1);
                 }
             }
@@ -673,14 +775,14 @@ public class SFTPTransfer implements FileTransfer {
 
     @Override
     public void rename(final FlowFile flowFile, final String source, final String target) throws IOException {
-        final ChannelSftp sftp = getChannel(flowFile);
+        final SFTPClient sftpClient = getSFTPClient(flowFile);
         try {
-            sftp.rename(source, target);
-        } catch (final SftpException e) {
-            switch (e.id) {
-                case ChannelSftp.SSH_FX_NO_SUCH_FILE:
+            sftpClient.rename(source, target);
+        } catch (final SFTPException e) {
+            switch (e.getStatusCode()) {
+                case NO_SUCH_FILE:
                     throw new FileNotFoundException("No such file or directory");
-                case ChannelSftp.SSH_FX_PERMISSION_DENIED:
+                case PERMISSION_DENIED:
                     throw new PermissionDeniedException("Could not rename remote file " + source + " to " + target + " due to insufficient permissions");
                 default:
                     throw new IOException(e);
@@ -730,17 +832,4 @@ public class SFTPTransfer implements FileTransfer {
         return number;
     }
 
-    static {
-        JSch.setLogger(new com.jcraft.jsch.Logger() {
-            @Override
-            public boolean isEnabled(int level) {
-                return true;
-            }
-
-            @Override
-            public void log(int level, String message) {
-                LoggerFactory.getLogger(SFTPTransfer.class).debug("SFTP Log: {}", message);
-            }
-        });
-    }
 }
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SFTPUtils.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SFTPUtils.java
deleted file mode 100644
index fc6275f..0000000
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SFTPUtils.java
+++ /dev/null
@@ -1,312 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.standard.util;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Deque;
-import java.util.Hashtable;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.regex.Matcher;
-
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.processor.Processor;
-import org.apache.nifi.processor.util.StandardValidators;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import com.jcraft.jsch.ChannelSftp;
-import com.jcraft.jsch.JSch;
-import com.jcraft.jsch.JSchException;
-import com.jcraft.jsch.Session;
-import com.jcraft.jsch.SftpException;
-
-public class SFTPUtils {
-
-    public static final PropertyDescriptor SFTP_PRIVATEKEY_PATH = new PropertyDescriptor.Builder()
-            .required(false)
-            .description("sftp.privatekey.path")
-            .defaultValue(null)
-            .name("sftp.privatekey.path")
-            .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR)
-            .sensitive(false)
-            .build();
-    public static final PropertyDescriptor REMOTE_PASSWORD = new PropertyDescriptor.Builder()
-            .required(false)
-            .description("remote.password")
-            .defaultValue(null)
-            .name("remote.password")
-            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-            .sensitive(true)
-            .build();
-    public static final PropertyDescriptor SFTP_PRIVATEKEY_PASSPHRASE = new PropertyDescriptor.Builder()
-            .required(false)
-            .description("sftp.privatekey.passphrase")
-            .defaultValue(null)
-            .name("sftp.privatekey.passphrase")
-            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-            .sensitive(true)
-            .build();
-    public static final PropertyDescriptor SFTP_PORT = new PropertyDescriptor.Builder()
-            .required(false)
-            .description("sftp.port")
-            .defaultValue(null)
-            .name("sftp.port")
-            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
-            .sensitive(false)
-            .build();
-    public static final PropertyDescriptor NETWORK_DATA_TIMEOUT = new PropertyDescriptor.Builder()
-            .required(false)
-            .description("network.data.timeout")
-            .defaultValue(null)
-            .name("network.data.timeout")
-            .addValidator(StandardValidators.INTEGER_VALIDATOR)
-            .sensitive(false)
-            .build();
-    public static final PropertyDescriptor SFTP_HOSTKEY_FILENAME = new PropertyDescriptor.Builder()
-            .required(false)
-            .description("sftp.hostkey.filename")
-            .defaultValue(null)
-            .name("sftp.hostkey.filename")
-            .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR)
-            .sensitive(false)
-            .build();
-    public static final PropertyDescriptor NETWORK_CONNECTION_TIMEOUT = new PropertyDescriptor.Builder()
-            .required(false)
-            .description("network.connection.timeout")
-            .defaultValue(null)
-            .name("network.connection.timeout")
-            .addValidator(StandardValidators.INTEGER_VALIDATOR)
-            .sensitive(false)
-            .build();
-
-    // required properties
-    public static final PropertyDescriptor REMOTE_HOSTNAME = new PropertyDescriptor.Builder()
-            .required(true)
-            .description("remote.hostname")
-            .defaultValue(null)
-            .name("remote.hostname")
-            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-            .sensitive(false)
-            .build();
-    public static final PropertyDescriptor REMOTE_USERNAME = new PropertyDescriptor.Builder()
-            .required(true)
-            .description("remote.username")
-            .defaultValue(null)
-            .name("remote.username")
-            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-            .sensitive(false)
-            .build();
-
-    private static final List<PropertyDescriptor> propertyDescriptors = new ArrayList<>();
-
-    static {
-        JSch.setLogger(SFTPUtils.createLogger());
-        propertyDescriptors.add(SFTP_PRIVATEKEY_PATH);
-        propertyDescriptors.add(REMOTE_PASSWORD);
-        propertyDescriptors.add(SFTP_PRIVATEKEY_PASSPHRASE);
-        propertyDescriptors.add(SFTP_PORT);
-        propertyDescriptors.add(NETWORK_DATA_TIMEOUT);
-        propertyDescriptors.add(SFTP_HOSTKEY_FILENAME);
-        propertyDescriptors.add(NETWORK_CONNECTION_TIMEOUT);
-        propertyDescriptors.add(REMOTE_USERNAME);
-        propertyDescriptors.add(REMOTE_HOSTNAME);
-    }
-
-    private static final Log logger = LogFactory.getLog(SFTPUtils.class);
-
-    public static List<PropertyDescriptor> getPropertyDescriptors() {
-        return propertyDescriptors;
-    }
-
-    public static SFTPConnection connectSftp(final SFTPConfiguration conf) throws JSchException, SftpException, IOException {
-        final JSch jsch = new JSch();
-        final Session session = SFTPUtils.createSession(conf, jsch);
-        final ChannelSftp sftp = (ChannelSftp) session.openChannel("sftp");
-        sftp.connect();
-        return new SFTPConnection(session, sftp);
-    }
-
-    public static void changeWorkingDirectory(final ChannelSftp sftp, final String dirPath, final boolean createDirs, final Processor proc) throws IOException {
-        final Deque<String> stack = new LinkedList<>();
-        File dir = new File(dirPath);
-        String currentWorkingDirectory = null;
-        boolean dirExists = false;
-        final String forwardPaths = dir.getPath().replaceAll(Matcher.quoteReplacement("\\"), Matcher.quoteReplacement("/"));
-        try {
-            currentWorkingDirectory = sftp.pwd();
-            logger.debug(proc + " attempting to change directory from " + currentWorkingDirectory + " to " + dir.getPath());
-            //always use forward paths for long string attempt
-            sftp.cd(forwardPaths);
-            dirExists = true;
-            logger.debug(proc + " changed working directory to '" + forwardPaths + "' from '" + currentWorkingDirectory + "'");
-        } catch (final SftpException sftpe) {
-            logger.debug(proc + " could not change directory to '" + forwardPaths + "' from '" + currentWorkingDirectory + "' so trying the hard way.");
-        }
-        if (dirExists) {
-            return;
-        }
-        if (!createDirs) {
-            throw new IOException("Unable to change to requested working directory \'" + forwardPaths + "\' but not configured to create dirs.");
-        }
-
-        do {
-            stack.push(dir.getName());
-        } while ((dir = dir.getParentFile()) != null);
-
-        String dirName = null;
-        while ((dirName = stack.peek()) != null) {
-            stack.pop();
-            //find out if exists, if not make it if configured to do so or throw exception
-            dirName = ("".equals(dirName.trim())) ? "/" : dirName;
-            try {
-                sftp.cd(dirName);
-            } catch (final SftpException sftpe) {
-                logger.debug(proc + " creating new directory and changing to it " + dirName);
-                try {
-                    sftp.mkdir(dirName);
-                    sftp.cd(dirName);
-                } catch (final SftpException e) {
-                    throw new IOException(proc + " could not make/change directory to [" + dirName + "] [" + e.getLocalizedMessage() + "]", e);
-                }
-            }
-        }
-    }
-
-    public static Session createSession(final SFTPConfiguration conf, final JSch jsch) throws JSchException, IOException {
-        if (conf == null || null == jsch) {
-            throw new NullPointerException();
-        }
-
-        final Hashtable<String, String> newOptions = new Hashtable<>();
-
-        Session session = jsch.getSession(conf.username, conf.hostname, conf.port);
-
-        final String hostKeyVal = conf.hostkeyFile;
-
-        if (null != hostKeyVal) {
-            try {
-                jsch.setKnownHosts(hostKeyVal);
-            } catch (final IndexOutOfBoundsException iob) {
-                throw new IOException("Unable to establish connection due to bad known hosts key file " + hostKeyVal, iob);
-            }
-        } else {
-            newOptions.put("StrictHostKeyChecking", "no");
-            session.setConfig(newOptions);
-        }
-
-        final String privateKeyVal = conf.privatekeyFile;
-        if (null != privateKeyVal) {
-            jsch.addIdentity(privateKeyVal, conf.privateKeypassphrase);
-        }
-
-        if (null != conf.password) {
-            session.setPassword(conf.password);
-        }
-
-        session.setTimeout(conf.connectionTimeout); //set timeout for connection
-        session.connect();
-        session.setTimeout(conf.dataTimeout); //set timeout for data transfer
-
-        return session;
-    }
-
-    public static com.jcraft.jsch.Logger createLogger() {
-
-        return new com.jcraft.jsch.Logger() {
-
-            @Override
-            public boolean isEnabled(int level) {
-                return true;
-            }
-
-            @Override
-            public void log(int level, String message) {
-                logger.debug("SFTP Log: " + message);
-            }
-        };
-    }
-
-    public static class SFTPConfiguration {
-
-        private String hostname;
-        private String username;
-        private int port = 22;
-        private int connectionTimeout = 0;
-        private int dataTimeout = 0;
-        private String hostkeyFile;
-        private String privatekeyFile;
-        private String password;
-        private String privateKeypassphrase;
-
-        public SFTPConfiguration() {
-        }
-
-        public void setHostname(final String val) {
-            this.hostname = val;
-        }
-
-        public String getHostname() {
-            return hostname;
-        }
-
-        public void setUsername(final String val) {
-            this.username = val;
-        }
-
-        public String getUsername() {
-            return username;
-        }
-
-        public void setPort(final String val) {
-            if (val != null) {
-                port = Integer.parseInt(val);
-            }
-        }
-
-        public void setConnectionTimeout(final String val) {
-            if (val != null) {
-                connectionTimeout = Integer.parseInt(val);
-            }
-        }
-
-        public void setDataTimeout(final String val) {
-            if (val != null) {
-                dataTimeout = Integer.parseInt(val);
-            }
-        }
-
-        public void setHostkeyFile(final String val) {
-            this.hostkeyFile = val;
-        }
-
-        public void setPrivateKeyFile(final String val) {
-            this.privatekeyFile = val;
-        }
-
-        public void setPassword(final String val) {
-            this.password = val;
-        }
-
-        public void setPrivateKeyPassphrase(final String val) {
-            this.privateKeypassphrase = val;
-        }
-    }
-}
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestSFTPTransfer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestSFTPTransfer.java
index d2249f6..e9d9a0b 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestSFTPTransfer.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestSFTPTransfer.java
@@ -16,8 +16,9 @@
  */
 package org.apache.nifi.processors.standard.util;
 
-import com.jcraft.jsch.ChannelSftp;
-import com.jcraft.jsch.SftpException;
+import net.schmizz.sshj.sftp.Response;
+import net.schmizz.sshj.sftp.SFTPClient;
+import net.schmizz.sshj.sftp.SFTPException;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.processor.ProcessContext;
@@ -31,9 +32,6 @@ import java.io.File;
 import java.io.IOException;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import static com.jcraft.jsch.ChannelSftp.SSH_FX_FAILURE;
-import static com.jcraft.jsch.ChannelSftp.SSH_FX_NO_SUCH_FILE;
-import static com.jcraft.jsch.ChannelSftp.SSH_FX_PERMISSION_DENIED;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 import static org.mockito.ArgumentMatchers.eq;
@@ -48,37 +46,37 @@ public class TestSFTPTransfer {
 
     private static final Logger logger = LoggerFactory.getLogger(TestSFTPTransfer.class);
 
-    private SFTPTransfer createSftpTransfer(ProcessContext processContext, ChannelSftp channel) {
+    private SFTPTransfer createSftpTransfer(ProcessContext processContext, SFTPClient sftpClient) {
         final ComponentLog componentLog = mock(ComponentLog.class);
         return new SFTPTransfer(processContext, componentLog) {
             @Override
-            protected ChannelSftp getChannel(FlowFile flowFile) throws IOException {
-                return channel;
+            protected SFTPClient getSFTPClient(FlowFile flowFile) throws IOException {
+                return sftpClient;
             }
         };
     }
 
     @Test
-    public void testEnsureDirectoryExistsAlreadyExisted() throws IOException, SftpException {
+    public void testEnsureDirectoryExistsAlreadyExisted() throws IOException, SFTPException {
         final ProcessContext processContext = mock(ProcessContext.class);
-        final ChannelSftp channel = mock(ChannelSftp.class);
-        final SFTPTransfer sftpTransfer = createSftpTransfer(processContext, channel);
+        final SFTPClient sftpClient = mock(SFTPClient.class);
+        final SFTPTransfer sftpTransfer = createSftpTransfer(processContext, sftpClient);
         final MockFlowFile flowFile = new MockFlowFile(0);
         final File remoteDir = new File("/dir1/dir2/dir3");
         sftpTransfer.ensureDirectoryExists(flowFile, remoteDir);
 
         // Dir existence check should be done by stat
-        verify(channel).stat(eq("/dir1/dir2/dir3"));
+        verify(sftpClient).stat(eq("/dir1/dir2/dir3"));
     }
 
     @Test
-    public void testEnsureDirectoryExistsFailedToStat() throws IOException, SftpException {
+    public void testEnsureDirectoryExistsFailedToStat() throws IOException, SFTPException {
         final ProcessContext processContext = mock(ProcessContext.class);
-        final ChannelSftp channel = mock(ChannelSftp.class);
+        final SFTPClient sftpClient = mock(SFTPClient.class);
         // stat for the parent was successful, simulating that dir2 exists, but no dir3.
-        when(channel.stat("/dir1/dir2/dir3")).thenThrow(new SftpException(SSH_FX_FAILURE, "Failure"));
+        when(sftpClient.stat("/dir1/dir2/dir3")).thenThrow(new SFTPException(Response.StatusCode.FAILURE, "Failure"));
 
-        final SFTPTransfer sftpTransfer = createSftpTransfer(processContext, channel);
+        final SFTPTransfer sftpTransfer = createSftpTransfer(processContext, sftpClient);
         final MockFlowFile flowFile = new MockFlowFile(0);
         final File remoteDir = new File("/dir1/dir2/dir3");
         try {
@@ -89,58 +87,60 @@ public class TestSFTPTransfer {
         }
 
         // Dir existence check should be done by stat
-        verify(channel).stat(eq("/dir1/dir2/dir3"));
+        verify(sftpClient).stat(eq("/dir1/dir2/dir3"));
     }
 
     @Test
-    public void testEnsureDirectoryExistsNotExisted() throws IOException, SftpException {
+    public void testEnsureDirectoryExistsNotExisted() throws IOException, SFTPException {
         final ProcessContext processContext = mock(ProcessContext.class);
-        final ChannelSftp channel = mock(ChannelSftp.class);
+        final SFTPClient sftpClient = mock(SFTPClient.class);
         // stat for the parent was successful, simulating that dir2 exists, but no dir3.
-        when(channel.stat("/dir1/dir2/dir3")).thenThrow(new SftpException(SSH_FX_NO_SUCH_FILE, "No such file"));
+        when(sftpClient.stat("/dir1/dir2/dir3")).thenThrow(new SFTPException(Response.StatusCode.NO_SUCH_FILE, "No such file"));
 
-        final SFTPTransfer sftpTransfer = createSftpTransfer(processContext, channel);
+        final SFTPTransfer sftpTransfer = createSftpTransfer(processContext, sftpClient);
         final MockFlowFile flowFile = new MockFlowFile(0);
         final File remoteDir = new File("/dir1/dir2/dir3");
         sftpTransfer.ensureDirectoryExists(flowFile, remoteDir);
 
         // Dir existence check should be done by stat
-        verify(channel).stat(eq("/dir1/dir2/dir3")); // dir3 was not found
-        verify(channel).stat(eq("/dir1/dir2")); // so, dir2 was checked
-        verify(channel).mkdir(eq("/dir1/dir2/dir3")); // dir2 existed, so dir3 was created.
+        verify(sftpClient).stat(eq("/dir1/dir2/dir3")); // dir3 was not found
+        verify(sftpClient).stat(eq("/dir1/dir2")); // so, dir2 was checked
+        verify(sftpClient).mkdir(eq("/dir1/dir2/dir3")); // dir2 existed, so dir3 was created.
     }
 
     @Test
-    public void testEnsureDirectoryExistsParentNotExisted() throws IOException, SftpException {
+    public void testEnsureDirectoryExistsParentNotExisted() throws IOException, SFTPException {
         final ProcessContext processContext = mock(ProcessContext.class);
-        final ChannelSftp channel = mock(ChannelSftp.class);
+        final SFTPClient sftpClient = mock(SFTPClient.class);
+
         // stat for the dir1 was successful, simulating that dir1 exists, but no dir2 and dir3.
-        when(channel.stat("/dir1/dir2/dir3")).thenThrow(new SftpException(SSH_FX_NO_SUCH_FILE, "No such file"));
-        when(channel.stat("/dir1/dir2")).thenThrow(new SftpException(SSH_FX_NO_SUCH_FILE, "No such file"));
+        when(sftpClient.stat("/dir1/dir2/dir3")).thenThrow(new SFTPException(Response.StatusCode.NO_SUCH_FILE, "No such file"));
+        when(sftpClient.stat("/dir1/dir2")).thenThrow(new SFTPException(Response.StatusCode.NO_SUCH_FILE, "No such file"));
 
-        final SFTPTransfer sftpTransfer = createSftpTransfer(processContext, channel);
+        final SFTPTransfer sftpTransfer = createSftpTransfer(processContext, sftpClient);
         final MockFlowFile flowFile = new MockFlowFile(0);
         final File remoteDir = new File("/dir1/dir2/dir3");
         sftpTransfer.ensureDirectoryExists(flowFile, remoteDir);
 
         // Dir existence check should be done by stat
-        verify(channel).stat(eq("/dir1/dir2/dir3")); // dir3 was not found
-        verify(channel).stat(eq("/dir1/dir2")); // dir2 was not found, too
-        verify(channel).stat(eq("/dir1")); // dir1 was found
-        verify(channel).mkdir(eq("/dir1/dir2")); // dir1 existed, so dir2 was created.
-        verify(channel).mkdir(eq("/dir1/dir2/dir3")); // then dir3 was created.
+        verify(sftpClient).stat(eq("/dir1/dir2/dir3")); // dir3 was not found
+        verify(sftpClient).stat(eq("/dir1/dir2")); // dir2 was not found, too
+        verify(sftpClient).stat(eq("/dir1")); // dir1 was found
+        verify(sftpClient).mkdir(eq("/dir1/dir2")); // dir1 existed, so dir2 was created.
+        verify(sftpClient).mkdir(eq("/dir1/dir2/dir3")); // then dir3 was created.
     }
 
     @Test
-    public void testEnsureDirectoryExistsNotExistedFailedToCreate() throws IOException, SftpException {
+    public void testEnsureDirectoryExistsNotExistedFailedToCreate() throws IOException, SFTPException {
         final ProcessContext processContext = mock(ProcessContext.class);
-        final ChannelSftp channel = mock(ChannelSftp.class);
+        final SFTPClient sftpClient = mock(SFTPClient.class);
+
         // stat for the parent was successful, simulating that dir2 exists, but no dir3.
-        when(channel.stat("/dir1/dir2/dir3")).thenThrow(new SftpException(SSH_FX_NO_SUCH_FILE, "No such file"));
+        when(sftpClient.stat("/dir1/dir2/dir3")).thenThrow(new SFTPException(Response.StatusCode.NO_SUCH_FILE, "No such file"));
         // Failed to create dir3.
-        doThrow(new SftpException(SSH_FX_FAILURE, "Failed")).when(channel).mkdir(eq("/dir1/dir2/dir3"));
+        doThrow(new SFTPException(Response.StatusCode.FAILURE, "Failed")).when(sftpClient).mkdir(eq("/dir1/dir2/dir3"));
 
-        final SFTPTransfer sftpTransfer = createSftpTransfer(processContext, channel);
+        final SFTPTransfer sftpTransfer = createSftpTransfer(processContext, sftpClient);
         final MockFlowFile flowFile = new MockFlowFile(0);
         final File remoteDir = new File("/dir1/dir2/dir3");
         try {
@@ -151,85 +151,85 @@ public class TestSFTPTransfer {
         }
 
         // Dir existence check should be done by stat
-        verify(channel).stat(eq("/dir1/dir2/dir3")); // dir3 was not found
-        verify(channel).stat(eq("/dir1/dir2")); // so, dir2 was checked
-        verify(channel).mkdir(eq("/dir1/dir2/dir3")); // dir2 existed, so dir3 was created.
+        verify(sftpClient).stat(eq("/dir1/dir2/dir3")); // dir3 was not found
+        verify(sftpClient).stat(eq("/dir1/dir2")); // so, dir2 was checked
+        verify(sftpClient).mkdir(eq("/dir1/dir2/dir3")); // dir2 existed, so dir3 was created.
     }
 
     @Test
-    public void testEnsureDirectoryExistsBlindlyNotExisted() throws IOException, SftpException {
+    public void testEnsureDirectoryExistsBlindlyNotExisted() throws IOException, SFTPException {
         final ProcessContext processContext = mock(ProcessContext.class);
         when(processContext.getProperty(SFTPTransfer.DISABLE_DIRECTORY_LISTING)).thenReturn(new MockPropertyValue("true"));
 
-        final ChannelSftp channel = mock(ChannelSftp.class);
-        final SFTPTransfer sftpTransfer = createSftpTransfer(processContext, channel);
+        final SFTPClient sftpClient = mock(SFTPClient.class);
+        final SFTPTransfer sftpTransfer = createSftpTransfer(processContext, sftpClient);
         final MockFlowFile flowFile = new MockFlowFile(0);
         final File remoteDir = new File("/dir1/dir2/dir3");
         sftpTransfer.ensureDirectoryExists(flowFile, remoteDir);
 
         // stat should not be called.
-        verify(channel, times(0)).stat(eq("/dir1/dir2/dir3"));
-        verify(channel).mkdir(eq("/dir1/dir2/dir3")); // dir3 was created blindly.
+        verify(sftpClient, times(0)).stat(eq("/dir1/dir2/dir3"));
+        verify(sftpClient).mkdir(eq("/dir1/dir2/dir3")); // dir3 was created blindly.
     }
 
     @Test
-    public void testEnsureDirectoryExistsBlindlyParentNotExisted() throws IOException, SftpException {
+    public void testEnsureDirectoryExistsBlindlyParentNotExisted() throws IOException, SFTPException {
         final ProcessContext processContext = mock(ProcessContext.class);
         when(processContext.getProperty(SFTPTransfer.DISABLE_DIRECTORY_LISTING)).thenReturn(new MockPropertyValue("true"));
 
-        final ChannelSftp channel = mock(ChannelSftp.class);
+        final SFTPClient sftpClient = mock(SFTPClient.class);
         final AtomicInteger mkdirCount = new AtomicInteger(0);
         doAnswer(invocation -> {
             final int cnt = mkdirCount.getAndIncrement();
             if (cnt == 0) {
                 // If the parent dir does not exist, no such file exception is thrown.
-                throw new SftpException(SSH_FX_NO_SUCH_FILE, "Failure");
+                throw new SFTPException(Response.StatusCode.NO_SUCH_FILE, "Failure");
             } else {
                 logger.info("Created the dir successfully for the 2nd time");
             }
             return true;
-        }).when(channel).mkdir(eq("/dir1/dir2/dir3"));
+        }).when(sftpClient).mkdir(eq("/dir1/dir2/dir3"));
 
-        final SFTPTransfer sftpTransfer = createSftpTransfer(processContext, channel);
+        final SFTPTransfer sftpTransfer = createSftpTransfer(processContext, sftpClient);
         final MockFlowFile flowFile = new MockFlowFile(0);
         final File remoteDir = new File("/dir1/dir2/dir3");
         sftpTransfer.ensureDirectoryExists(flowFile, remoteDir);
 
         // stat should not be called.
-        verify(channel, times(0)).stat(eq("/dir1/dir2/dir3"));
+        verify(sftpClient, times(0)).stat(eq("/dir1/dir2/dir3"));
         // dir3 was created blindly, but failed for the 1st time, and succeeded for the 2nd time.
-        verify(channel, times(2)).mkdir(eq("/dir1/dir2/dir3"));
-        verify(channel).mkdir(eq("/dir1/dir2")); // dir2 was created successfully.
+        verify(sftpClient, times(2)).mkdir(eq("/dir1/dir2/dir3"));
+        verify(sftpClient).mkdir(eq("/dir1/dir2")); // dir2 was created successfully.
     }
 
     @Test
-    public void testEnsureDirectoryExistsBlindlyAlreadyExisted() throws IOException, SftpException {
+    public void testEnsureDirectoryExistsBlindlyAlreadyExisted() throws IOException, SFTPException {
         final ProcessContext processContext = mock(ProcessContext.class);
         when(processContext.getProperty(SFTPTransfer.DISABLE_DIRECTORY_LISTING)).thenReturn(new MockPropertyValue("true"));
 
-        final ChannelSftp channel = mock(ChannelSftp.class);
+        final SFTPClient sftpClient = mock(SFTPClient.class);
         // If the dir existed, a failure exception is thrown, but should be swallowed.
-        doThrow(new SftpException(SSH_FX_FAILURE, "Failure")).when(channel).mkdir(eq("/dir1/dir2/dir3"));
+        doThrow(new SFTPException(Response.StatusCode.FAILURE, "Failure")).when(sftpClient).mkdir(eq("/dir1/dir2/dir3"));
 
-        final SFTPTransfer sftpTransfer = createSftpTransfer(processContext, channel);
+        final SFTPTransfer sftpTransfer = createSftpTransfer(processContext, sftpClient);
         final MockFlowFile flowFile = new MockFlowFile(0);
         final File remoteDir = new File("/dir1/dir2/dir3");
         sftpTransfer.ensureDirectoryExists(flowFile, remoteDir);
 
         // stat should not be called.
-        verify(channel, times(0)).stat(eq("/dir1/dir2/dir3"));
-        verify(channel).mkdir(eq("/dir1/dir2/dir3")); // dir3 was created blindly.
+        verify(sftpClient, times(0)).stat(eq("/dir1/dir2/dir3"));
+        verify(sftpClient).mkdir(eq("/dir1/dir2/dir3")); // dir3 was created blindly.
     }
 
     @Test
-    public void testEnsureDirectoryExistsBlindlyFailed() throws IOException, SftpException {
+    public void testEnsureDirectoryExistsBlindlyFailed() throws IOException, SFTPException {
         final ProcessContext processContext = mock(ProcessContext.class);
         when(processContext.getProperty(SFTPTransfer.DISABLE_DIRECTORY_LISTING)).thenReturn(new MockPropertyValue("true"));
 
-        final ChannelSftp channel = mock(ChannelSftp.class);
-        doThrow(new SftpException(SSH_FX_PERMISSION_DENIED, "Permission denied")).when(channel).mkdir(eq("/dir1/dir2/dir3"));
+        final SFTPClient sftpClient = mock(SFTPClient.class);
+        doThrow(new SFTPException(Response.StatusCode.PERMISSION_DENIED, "Permission denied")).when(sftpClient).mkdir(eq("/dir1/dir2/dir3"));
 
-        final SFTPTransfer sftpTransfer = createSftpTransfer(processContext, channel);
+        final SFTPTransfer sftpTransfer = createSftpTransfer(processContext, sftpClient);
         final MockFlowFile flowFile = new MockFlowFile(0);
         final File remoteDir = new File("/dir1/dir2/dir3");
         try {
@@ -240,8 +240,8 @@ public class TestSFTPTransfer {
         }
 
         // stat should not be called.
-        verify(channel, times(0)).stat(eq("/dir1/dir2/dir3"));
-        verify(channel).mkdir(eq("/dir1/dir2/dir3")); // dir3 was created blindly.
+        verify(sftpClient, times(0)).stat(eq("/dir1/dir2/dir3"));
+        verify(sftpClient).mkdir(eq("/dir1/dir2/dir3")); // dir3 was created blindly.
     }
 
 }
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestSFTPTransferWithSSHTestServer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestSFTPTransferWithSSHTestServer.java
new file mode 100644
index 0000000..d5fdf29
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestSFTPTransferWithSSHTestServer.java
@@ -0,0 +1,786 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard.util;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.context.PropertyContext;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.util.MockPropertyContext;
+import org.apache.nifi.util.file.FileUtils;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.io.Writer;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.text.DateFormat;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+public class TestSFTPTransferWithSSHTestServer {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(TestSFTPTransferWithSSHTestServer.class);
+
+    private static final String SFTP_ROOT_DIR = "target/test-sftp-transfer-vfs";
+
+    private static final String DIR_1 = "dir1";
+    private static final String DIR_2 = "dir2";
+    private static final String DIR_3 = "dir3";
+    private static final String DIR_4 = "dir4";
+
+    private static final String DIR_1_CHILD_1 = "child1";
+    private static final String DIR_1_CHILD_2 = "child2";
+
+    private static final String FILE_1 = "file1.txt";
+    private static final String FILE_2 = "file2.txt";
+    private static final String DOT_FILE = ".foo.txt";
+
+    private static SSHTestServer sshTestServer;
+
+    @BeforeClass
+    public static void setupClass() throws IOException {
+        sshTestServer = new SSHTestServer();
+        sshTestServer.setVirtualFileSystemPath(SFTP_ROOT_DIR);
+        sshTestServer.startServer();
+    }
+
+    @AfterClass
+    public static void cleanupClass() throws IOException {
+        sshTestServer.stopServer();
+    }
+
+    @Before
+    public void setupFiles() throws IOException {
+        final File sftpRootDir = new File(SFTP_ROOT_DIR);
+        FileUtils.deleteFilesInDir(sftpRootDir, null, LOGGER, true, true);
+
+        // create and initialize dir1/child1
+        initializeFile(SFTP_ROOT_DIR + "/" + DIR_1 + "/" + DIR_1_CHILD_1, FILE_1, "dir1 child1 file1");
+        initializeFile(SFTP_ROOT_DIR + "/" + DIR_1 + "/" + DIR_1_CHILD_1, FILE_2, "dir1 child1 file2");
+        initializeFile(SFTP_ROOT_DIR + "/" + DIR_1 + "/" + DIR_1_CHILD_1, DOT_FILE, "dir1 child1 foo");
+
+        // create and initialize dir1/child2
+        initializeFile(SFTP_ROOT_DIR + "/" + DIR_1 + "/" + DIR_1_CHILD_2, FILE_1, "dir1 child2 file1");
+        initializeFile(SFTP_ROOT_DIR + "/" + DIR_1 + "/" + DIR_1_CHILD_2, FILE_2, "dir1 child2 file2");
+        initializeFile(SFTP_ROOT_DIR + "/" + DIR_1 + "/" + DIR_1_CHILD_2, DOT_FILE, "dir1 child2 foo");
+
+        // create and initialize dir2
+        initializeFile(SFTP_ROOT_DIR + "/" + DIR_2, FILE_1, "dir2 file1");
+        initializeFile(SFTP_ROOT_DIR + "/" + DIR_2, FILE_2, "dir2 file2");
+        initializeFile(SFTP_ROOT_DIR + "/" + DIR_2, DOT_FILE, "dir2 foo");
+
+        // Create a symbolic link so that dir3/dir1 links to dir1 so we can test following links
+        final Path targetPath = Paths.get("../" + DIR_1);
+
+        final String dir3Path = SFTP_ROOT_DIR + "/" + DIR_3;
+        FileUtils.ensureDirectoryExistAndCanAccess(new File(dir3Path));
+        final Path linkPath = Paths.get(dir3Path + "/" + DIR_1);
+
+        Files.createSymbolicLink(linkPath, targetPath);
+
+        // create dir4 for writing files
+        final File dir4File = new File(SFTP_ROOT_DIR + "/" + DIR_4);
+        FileUtils.ensureDirectoryExistAndCanAccess(dir4File);
+    }
+
+    private void initializeFile(final String path, final String filename, final String content) throws IOException {
+        final File parent = new File(path);
+        if (!parent.exists()) {
+            assertTrue("Failed to create parent directory: " + path, parent.mkdirs());
+        }
+
+        final File file = new File(parent, filename);
+        try (final OutputStream out = new FileOutputStream(file);
+             final Writer writer = new OutputStreamWriter(out)) {
+            writer.write(content);
+            writer.flush();
+        }
+    }
+
+    @Test
+    public void testGetListingSimple() throws IOException {
+        final Map<PropertyDescriptor, String> properties = createBaseProperties();
+        properties.put(SFTPTransfer.REMOTE_PATH, DIR_2);
+
+        try(final SFTPTransfer transfer = createSFTPTransfer(properties)) {
+            final List<FileInfo> listing = transfer.getListing();
+            assertNotNull(listing);
+            assertEquals(2, listing.size());
+
+            final FileInfo file1Info = listing.stream().filter(f -> f.getFileName().equals(FILE_1)).findFirst().orElse(null);
+            assertNotNull(file1Info);
+            assertFalse(file1Info.isDirectory());
+            assertEquals("rw-r--r--", file1Info.getPermissions());
+
+            final FileInfo file2Info = listing.stream().filter(f -> f.getFileName().equals(FILE_2)).findFirst().orElse(null);
+            assertNotNull(file2Info);
+            assertFalse(file2Info.isDirectory());
+            assertEquals("rw-r--r--", file2Info.getPermissions());
+        }
+    }
+
+    @Test
+    public void testGetListingSimpleWithDotFiles() throws IOException {
+        final Map<PropertyDescriptor, String> properties = createBaseProperties();
+        properties.put(SFTPTransfer.REMOTE_PATH, DIR_2);
+        properties.put(SFTPTransfer.IGNORE_DOTTED_FILES, "false");
+
+        try(final SFTPTransfer transfer = createSFTPTransfer(properties)) {
+            final List<FileInfo> listing = transfer.getListing();
+            assertNotNull(listing);
+            assertEquals(3, listing.size());
+
+            final FileInfo dotFileInfo = listing.stream().filter(f -> f.getFileName().equals(DOT_FILE)).findFirst().orElse(null);
+            assertNotNull(dotFileInfo);
+        }
+    }
+
+    @Test
+    public void testGetListingWithoutRecursiveSearch() throws IOException {
+        final Map<PropertyDescriptor, String> properties = createBaseProperties();
+        properties.put(SFTPTransfer.REMOTE_PATH, DIR_1);
+        properties.put(SFTPTransfer.RECURSIVE_SEARCH, "false");
+
+        try(final SFTPTransfer transfer = createSFTPTransfer(properties)) {
+            final List<FileInfo> listing = transfer.getListing();
+            assertNotNull(listing);
+            assertEquals(0, listing.size());
+        }
+    }
+
+    @Test
+    public void testGetListingWithRecursiveSearch() throws IOException {
+        final Map<PropertyDescriptor, String> properties = createBaseProperties();
+        properties.put(SFTPTransfer.REMOTE_PATH, DIR_1);
+        properties.put(SFTPTransfer.RECURSIVE_SEARCH, "true");
+
+        try(final SFTPTransfer transfer = createSFTPTransfer(properties)) {
+            final List<FileInfo> listing = transfer.getListing();
+            assertNotNull(listing);
+            assertEquals(4, listing.size());
+        }
+    }
+
+    @Test
+    public void testGetListingWithoutSymlinks() throws IOException {
+        final Map<PropertyDescriptor, String> properties = createBaseProperties();
+        properties.put(SFTPTransfer.REMOTE_PATH, DIR_3);
+        properties.put(SFTPTransfer.RECURSIVE_SEARCH, "true");
+        properties.put(SFTPTransfer.FOLLOW_SYMLINK, "false");
+
+        try(final SFTPTransfer transfer = createSFTPTransfer(properties)) {
+            final List<FileInfo> listing = transfer.getListing();
+            assertNotNull(listing);
+            assertEquals(0, listing.size());
+        }
+    }
+
+    @Test
+    public void testGetListingWithSymlinks() throws IOException {
+        final Map<PropertyDescriptor, String> properties = createBaseProperties();
+        properties.put(SFTPTransfer.REMOTE_PATH, DIR_3);
+        properties.put(SFTPTransfer.RECURSIVE_SEARCH, "true");
+        properties.put(SFTPTransfer.FOLLOW_SYMLINK, "true");
+
+        try(final SFTPTransfer transfer = createSFTPTransfer(properties)) {
+            final List<FileInfo> listing = transfer.getListing();
+            assertNotNull(listing);
+            assertEquals(4, listing.size());
+        }
+    }
+
+    @Test
+    public void testGetListingWithBatchSize() throws IOException {
+        final Map<PropertyDescriptor, String> properties = createBaseProperties();
+        properties.put(SFTPTransfer.REMOTE_PATH, DIR_1);
+        properties.put(SFTPTransfer.RECURSIVE_SEARCH, "true");
+
+        // first listing is without batch size and shows 4 results
+        try(final SFTPTransfer transfer = createSFTPTransfer(properties)) {
+            final List<FileInfo> listing = transfer.getListing();
+            assertNotNull(listing);
+            assertEquals(4, listing.size());
+        }
+
+        // set a batch size of 2 and ensure we get 2 results
+        properties.put(SFTPTransfer.REMOTE_POLL_BATCH_SIZE, "2");
+
+        try(final SFTPTransfer transfer = createSFTPTransfer(properties)) {
+            final List<FileInfo> listing = transfer.getListing();
+            assertNotNull(listing);
+            assertEquals(2, listing.size());
+        }
+    }
+
+    @Test
+    public void testGetListingWithFileFilter() throws IOException {
+        final String fileFilterRegex = "file1.*";
+
+        final Map<PropertyDescriptor, String> properties = createBaseProperties();
+        properties.put(SFTPTransfer.REMOTE_PATH, DIR_1);
+        properties.put(SFTPTransfer.RECURSIVE_SEARCH, "true");
+        properties.put(SFTPTransfer.FILE_FILTER_REGEX, fileFilterRegex);
+
+        try(final SFTPTransfer transfer = createSFTPTransfer(properties)) {
+            final List<FileInfo> listing = transfer.getListing();
+            assertNotNull(listing);
+            assertEquals(2, listing.size());
+
+            listing.forEach(f -> assertTrue(f.getFileName().matches(fileFilterRegex)));
+        }
+    }
+
+    @Test
+    public void testGetListingWithPathFilter() throws IOException {
+        final String remotePath = ".";
+        final String pathFilterRegex = "dir1/child1";
+
+        final Map<PropertyDescriptor, String> properties = createBaseProperties();
+        properties.put(SFTPTransfer.REMOTE_PATH, remotePath);
+        properties.put(SFTPTransfer.RECURSIVE_SEARCH, "true");
+        properties.put(SFTPTransfer.PATH_FILTER_REGEX, pathFilterRegex);
+
+        try(final SFTPTransfer transfer = createSFTPTransfer(properties)) {
+            final List<FileInfo> listing = transfer.getListing();
+            assertNotNull(listing);
+            assertEquals(2, listing.size());
+
+            // a listing will have fullPathFileName like "./dir1/child1/file1.txt" so to verify the path pattern
+            // we need to remove the file part and relativize based on the remote path to get "dir1/child1"
+            listing.forEach(f -> {
+                final String filename = f.getFileName();
+                final String path = f.getFullPathFileName().replace(filename, "");
+
+                final Path fullPath = Paths.get(path);
+                final Path relPath = Paths.get(remotePath).relativize(fullPath);
+                assertTrue(relPath.toString().matches(pathFilterRegex));
+            });
+        }
+    }
+
+    @Test(expected = FileNotFoundException.class)
+    public void testGetListingWhenRemotePathDoesNotExist() throws IOException {
+        final Map<PropertyDescriptor, String> properties = createBaseProperties();
+        properties.put(SFTPTransfer.REMOTE_PATH, "DOES-NOT-EXIST");
+        properties.put(SFTPTransfer.RECURSIVE_SEARCH, "true");
+
+        try(final SFTPTransfer transfer = createSFTPTransfer(properties)) {
+            transfer.getListing();
+        }
+    }
+
+    @Test
+    public void testGetInputStream() throws IOException {
+        final String filename = "./" + DIR_2 + "/" + FILE_1;
+        final Map<PropertyDescriptor, String> properties = createBaseProperties();
+
+        try(final SFTPTransfer transfer = createSFTPTransfer(properties);
+            final InputStream in = transfer.getInputStream(filename)) {
+            final String content = IOUtils.toString(in, StandardCharsets.UTF_8);
+            assertEquals("dir2 file1", content);
+        }
+    }
+
+    @Test(expected = FileNotFoundException.class)
+    public void testGetInputStreamWhenFileDoesNotExist() throws IOException {
+        final String filename = "./" + DIR_2 + "/DOES-NOT-EXIST";
+        final Map<PropertyDescriptor, String> properties = createBaseProperties();
+
+        try(final SFTPTransfer transfer = createSFTPTransfer(properties);
+            final InputStream in = transfer.getInputStream(filename)) {
+            IOUtils.toString(in, StandardCharsets.UTF_8);
+        }
+    }
+
+    @Test
+    public void testDeleteFileWithoutPath() throws IOException {
+        final Map<PropertyDescriptor, String> properties = createBaseProperties();
+        properties.put(SFTPTransfer.REMOTE_PATH, DIR_2);
+
+        try(final SFTPTransfer transfer = createSFTPTransfer(properties)) {
+            // verify the directory has two files
+            final List<FileInfo> listing = transfer.getListing();
+            assertNotNull(listing);
+            assertEquals(2, listing.size());
+
+            // issue deletes for the two files
+            for (final FileInfo fileInfo : listing) {
+                transfer.deleteFile(null, null, fileInfo.getFullPathFileName());
+            }
+
+            // verify there are now zero files
+            final List<FileInfo> listingAfterDelete = transfer.getListing();
+            assertNotNull(listingAfterDelete);
+            assertEquals(0, listingAfterDelete.size());
+        }
+    }
+
+    @Test
+    public void testDeleteFileWithPath() throws IOException {
+        final Map<PropertyDescriptor, String> properties = createBaseProperties();
+        properties.put(SFTPTransfer.REMOTE_PATH, DIR_2);
+
+        try(final SFTPTransfer transfer = createSFTPTransfer(properties)) {
+            // verify the directory has two files
+            final List<FileInfo> listing = transfer.getListing();
+            assertNotNull(listing);
+            assertEquals(2, listing.size());
+
+            // issue deletes for the two files
+            for (final FileInfo fileInfo : listing) {
+                final String filename = fileInfo.getFileName();
+                final String path = fileInfo.getFullPathFileName().replace(filename, "");
+                transfer.deleteFile(null, path, filename);
+            }
+
+            // verify there are now zero files
+            final List<FileInfo> listingAfterDelete = transfer.getListing();
+            assertNotNull(listingAfterDelete);
+            assertEquals(0, listingAfterDelete.size());
+        }
+    }
+
+    @Test(expected = FileNotFoundException.class)
+    public void testDeleteFileWhenDoesNotExist() throws IOException {
+        final Map<PropertyDescriptor, String> properties = createBaseProperties();
+
+        try(final SFTPTransfer transfer = createSFTPTransfer(properties)) {
+            transfer.deleteFile(null, null, "foo/bar/does-not-exist.txt");
+        }
+    }
+
+    @Test
+    public void testDeleteDirectory() throws IOException {
+        final Map<PropertyDescriptor, String> properties = createBaseProperties();
+        properties.put(SFTPTransfer.REMOTE_PATH, DIR_4);
+
+        try(final SFTPTransfer transfer = createSFTPTransfer(properties)) {
+            // verify the directory exists
+            final List<FileInfo> listing = transfer.getListing();
+            assertNotNull(listing);
+            assertEquals(0, listing.size());
+
+            transfer.deleteDirectory(null, DIR_4);
+
+            // verify the directory no longer exists
+            try {
+                transfer.getListing();
+                Assert.fail("Should have thrown exception");
+            } catch (FileNotFoundException e) {
+                // nothing to do, expected
+            }
+        }
+    }
+
+    @Test(expected = IOException.class)
+    public void testDeleteDirectoryWhenDoesNotExist() throws IOException {
+        final Map<PropertyDescriptor, String> properties = createBaseProperties();
+
+        try(final SFTPTransfer transfer = createSFTPTransfer(properties)) {
+            transfer.deleteDirectory(null, "DOES-NOT-EXIST");
+        }
+    }
+
+    @Test
+    public void testEnsureDirectoryExistsSimple() throws IOException {
+        final String remotePath = "DOES-NOT-EXIST";
+        final Map<PropertyDescriptor, String> properties = createBaseProperties();
+        properties.put(SFTPTransfer.REMOTE_PATH, remotePath);
+
+        try(final SFTPTransfer transfer = createSFTPTransfer(properties)) {
+            // verify the directory does not exist
+            try {
+                transfer.getListing();
+                Assert.fail("Should have failed");
+            } catch (FileNotFoundException e) {
+                // Nothing to do, expected
+            }
+
+            final String absolutePath = transfer.getAbsolutePath(null, remotePath);
+            transfer.ensureDirectoryExists(null, new File(absolutePath));
+
+            // verify the directory now exists
+            final List<FileInfo> listing = transfer.getListing();
+            assertNotNull(listing);
+            assertEquals(0, listing.size());
+        }
+    }
+
+    @Test
+    public void testEnsureDirectoryExistsMultipleLevels() throws IOException {
+        final String remotePath = "A/B/C";
+        final Map<PropertyDescriptor, String> properties = createBaseProperties();
+        properties.put(SFTPTransfer.REMOTE_PATH, remotePath);
+
+        try(final SFTPTransfer transfer = createSFTPTransfer(properties)) {
+            // verify the directory does not exist
+            try {
+                transfer.getListing();
+                Assert.fail("Should have failed");
+            } catch (FileNotFoundException e) {
+                // Nothing to do, expected
+            }
+
+            final String absolutePath = transfer.getAbsolutePath(null, remotePath);
+            transfer.ensureDirectoryExists(null, new File(absolutePath));
+
+            // verify the directory now exists
+            final List<FileInfo> listing = transfer.getListing();
+            assertNotNull(listing);
+            assertEquals(0, listing.size());
+        }
+    }
+
+    @Test
+    public void testEnsureDirectoryExistsWhenAlreadyExists() throws IOException {
+        final Map<PropertyDescriptor, String> properties = createBaseProperties();
+        properties.put(SFTPTransfer.REMOTE_PATH, DIR_2);
+
+        try(final SFTPTransfer transfer = createSFTPTransfer(properties)) {
+            // verify the directory already exists
+            final List<FileInfo> listing = transfer.getListing();
+            assertNotNull(listing);
+            assertEquals(2, listing.size());
+
+            final String absolutePath = transfer.getAbsolutePath(null, DIR_2);
+            transfer.ensureDirectoryExists(null, new File(absolutePath));
+        }
+    }
+
+    @Test
+    public void testEnsureDirectoryExistsWithDirectoryListingDisabled() throws IOException {
+        final String remotePath = "DOES-NOT-EXIST";
+        final Map<PropertyDescriptor, String> properties = createBaseProperties();
+        properties.put(SFTPTransfer.REMOTE_PATH, remotePath);
+        properties.put(SFTPTransfer.DISABLE_DIRECTORY_LISTING, "true");
+
+        try(final SFTPTransfer transfer = createSFTPTransfer(properties)) {
+            // verify the directory does not exist
+            try {
+                transfer.getListing();
+                Assert.fail("Should have failed");
+            } catch (FileNotFoundException e) {
+                // Nothing to do, expected
+            }
+
+            final String absolutePath = transfer.getAbsolutePath(null, remotePath);
+            transfer.ensureDirectoryExists(null, new File(absolutePath));
+
+            // verify the directory now exists
+            final List<FileInfo> listing = transfer.getListing();
+            assertNotNull(listing);
+            assertEquals(0, listing.size());
+        }
+    }
+
+    @Test(expected = IOException.class)
+    public void testEnsureDirectoryExistsWithDirectoryListingDisabledAndAlreadyExists() throws IOException {
+        final Map<PropertyDescriptor, String> properties = createBaseProperties();
+        properties.put(SFTPTransfer.REMOTE_PATH, DIR_2);
+        properties.put(SFTPTransfer.DISABLE_DIRECTORY_LISTING, "true");
+
+        try(final SFTPTransfer transfer = createSFTPTransfer(properties)) {
+            // verify the directory already exists
+            final List<FileInfo> listing = transfer.getListing();
+            assertNotNull(listing);
+            assertEquals(2, listing.size());
+
+            final String absolutePath = transfer.getAbsolutePath(null, DIR_2);
+            transfer.ensureDirectoryExists(null, new File(absolutePath));
+        }
+    }
+
+    @Test
+    public void testEnsureDirectoryExistsWithDirectoryListingDisabledAndParentDoesNotExist() throws IOException {
+        final String remotePath = "A/B/C";
+
+        final Map<PropertyDescriptor, String> properties = createBaseProperties();
+        properties.put(SFTPTransfer.REMOTE_PATH, remotePath);
+        properties.put(SFTPTransfer.DISABLE_DIRECTORY_LISTING, "true");
+
+        try(final SFTPTransfer transfer = createSFTPTransfer(properties)) {
+            // verify the directory does not exist
+            try {
+                transfer.getListing();
+                Assert.fail("Should have failed");
+            } catch (FileNotFoundException e) {
+                // Nothing to do, expected
+            }
+
+            // Should swallow exception here
+            final String absolutePath = transfer.getAbsolutePath(null, remotePath);
+            transfer.ensureDirectoryExists(null, new File(absolutePath));
+        }
+    }
+
+    @Test
+    public void testGetRemoteFileInfo() throws IOException {
+        final Map<PropertyDescriptor, String> properties = createBaseProperties();
+
+        try(final SFTPTransfer transfer = createSFTPTransfer(properties)) {
+            final FileInfo fileInfo = transfer.getRemoteFileInfo(null, DIR_2, FILE_1);
+            assertNotNull(fileInfo);
+            assertEquals(FILE_1, fileInfo.getFileName());
+        }
+    }
+
+    @Test
+    public void testGetRemoteFileInfoWhenPathDoesNotExist() throws IOException {
+        final Map<PropertyDescriptor, String> properties = createBaseProperties();
+
+        try(final SFTPTransfer transfer = createSFTPTransfer(properties)) {
+            final FileInfo fileInfo = transfer.getRemoteFileInfo(null, "DOES-NOT-EXIST", FILE_1);
+            assertNull(fileInfo);
+        }
+    }
+
+    @Test
+    public void testGetRemoteFileInfoWhenFileDoesNotExist() throws IOException {
+        final Map<PropertyDescriptor, String> properties = createBaseProperties();
+
+        try(final SFTPTransfer transfer = createSFTPTransfer(properties)) {
+            final FileInfo fileInfo = transfer.getRemoteFileInfo(null, DIR_2, "DOES-NOT-EXIST");
+            assertNull(fileInfo);
+        }
+    }
+
+    @Test
+    public void testGetRemoteFileInfoWhenFileIsADirectory() throws IOException {
+        final Map<PropertyDescriptor, String> properties = createBaseProperties();
+
+        try(final SFTPTransfer transfer = createSFTPTransfer(properties)) {
+            final FileInfo fileInfo = transfer.getRemoteFileInfo(null, DIR_1, DIR_1_CHILD_1);
+            assertNull(fileInfo);
+        }
+    }
+
+    @Test
+    public void testRename() throws IOException {
+        final Map<PropertyDescriptor, String> properties = createBaseProperties();
+
+        try(final SFTPTransfer transfer = createSFTPTransfer(properties)) {
+            final String source = DIR_2 + "/" + FILE_1;
+            final String target = DIR_2 + "/" + FILE_1 + "-RENAMED";
+
+            final FileInfo targetInfoBefore = transfer.getRemoteFileInfo(null, DIR_2, FILE_1 + "-RENAMED");
+            assertNull(targetInfoBefore);
+
+            transfer.rename(null, source, target);
+
+            final FileInfo targetInfoAfter = transfer.getRemoteFileInfo(null, DIR_2, FILE_1 + "-RENAMED");
+            assertNotNull(targetInfoAfter);
+        }
+    }
+
+    @Test(expected = FileNotFoundException.class)
+    public void testRenameWhenSourceDoesNotExist() throws IOException {
+        final Map<PropertyDescriptor, String> properties = createBaseProperties();
+
+        try(final SFTPTransfer transfer = createSFTPTransfer(properties)) {
+            final String source = DIR_2 + "/DOES-NOT-EXIST";
+            final String target = DIR_2 + "/" + FILE_1 + "-RENAMED";
+            transfer.rename(null, source, target);
+        }
+    }
+
+    @Test(expected = IOException.class)
+    public void testRenameWhenTargetAlreadyExists() throws IOException {
+        final Map<PropertyDescriptor, String> properties = createBaseProperties();
+
+        try(final SFTPTransfer transfer = createSFTPTransfer(properties)) {
+            final String source = DIR_2 + "/" + FILE_1;
+            final String target = DIR_2 + "/" + FILE_2;
+
+            final FileInfo targetInfoBefore = transfer.getRemoteFileInfo(null, DIR_2, FILE_2);
+            assertNotNull(targetInfoBefore);
+
+            transfer.rename(null, source, target);
+        }
+    }
+
+    @Test
+    public void testPutWithPermissions() throws IOException {
+        final String permissions = "rw-rw-rw-";
+
+        final Map<PropertyDescriptor, String> properties = createBaseProperties();
+        properties.put(SFTPTransfer.PERMISSIONS, permissions);
+
+        final String filename = "test-put-simple.txt";
+        final String fileContent = "this is a test";
+
+        try(final SFTPTransfer transfer = createSFTPTransfer(properties);
+            final InputStream in = new ByteArrayInputStream(fileContent.getBytes(StandardCharsets.UTF_8))) {
+
+            // Verify file does not already exist
+            final FileInfo fileInfoBefore = transfer.getRemoteFileInfo(null, DIR_4, filename);
+            assertNull(fileInfoBefore);
+
+            final String fullPath = transfer.put(null, DIR_4, filename, in);
+            assertNotNull(fullPath);
+
+            // Verify file now exists
+            final FileInfo fileInfoAfter = transfer.getRemoteFileInfo(null, DIR_4, filename);
+            assertNotNull(fileInfoAfter);
+            assertEquals(permissions, fileInfoAfter.getPermissions());
+
+            // Verify correct content was written
+            final File writtenFile = new File(SFTP_ROOT_DIR + "/" + DIR_4 + "/" + filename);
+            final String retrievedContent = IOUtils.toString(writtenFile.toURI(), StandardCharsets.UTF_8);
+            assertEquals(fileContent, retrievedContent);
+        }
+    }
+
+    @Test
+    public void testPutWithTempFilename() throws IOException {
+        final String permissions = "rw-rw-rw-";
+
+        final Map<PropertyDescriptor, String> properties = createBaseProperties();
+        properties.put(SFTPTransfer.PERMISSIONS, permissions);
+        properties.put(SFTPTransfer.TEMP_FILENAME, "temp-file.txt");
+
+        final String filename = "test-put-simple.txt";
+        final String fileContent = "this is a test";
+
+        try(final SFTPTransfer transfer = createSFTPTransfer(properties);
+            final InputStream in = new ByteArrayInputStream(fileContent.getBytes(StandardCharsets.UTF_8))) {
+
+            // Verify file does not already exist
+            final FileInfo fileInfoBefore = transfer.getRemoteFileInfo(null, DIR_4, filename);
+            assertNull(fileInfoBefore);
+
+            final String fullPath = transfer.put(null, DIR_4, filename, in);
+            assertNotNull(fullPath);
+
+            // Verify file now exists
+            final FileInfo fileInfoAfter = transfer.getRemoteFileInfo(null, DIR_4, filename);
+            assertNotNull(fileInfoAfter);
+            assertEquals(permissions, fileInfoAfter.getPermissions());
+        }
+    }
+
+    @Test
+    public void testPutWithLastModifiedTime() throws IOException, ParseException {
+        final String permissions = "rw-rw-rw-";
+        final String lastModifiedTime = "2019-09-01T11:11:11-0500";
+
+        final DateFormat formatter = new SimpleDateFormat(SFTPTransfer.FILE_MODIFY_DATE_ATTR_FORMAT, Locale.US);
+        final long expectedLastModifiedTime = formatter.parse(lastModifiedTime).getTime();
+
+        final Map<PropertyDescriptor, String> properties = createBaseProperties();
+        properties.put(SFTPTransfer.PERMISSIONS, permissions);
+        properties.put(SFTPTransfer.LAST_MODIFIED_TIME, lastModifiedTime);
+
+        final String filename = "test-put-simple.txt";
+        final String fileContent = "this is a test";
+
+        try(final SFTPTransfer transfer = createSFTPTransfer(properties);
+            final InputStream in = new ByteArrayInputStream(fileContent.getBytes(StandardCharsets.UTF_8))) {
+
+            // Verify file does not already exist
+            final FileInfo fileInfoBefore = transfer.getRemoteFileInfo(null, DIR_4, filename);
+            assertNull(fileInfoBefore);
+
+            final String fullPath = transfer.put(null, DIR_4, filename, in);
+            assertNotNull(fullPath);
+
+            // Verify file now exists
+            final FileInfo fileInfoAfter = transfer.getRemoteFileInfo(null, DIR_4, filename);
+            assertNotNull(fileInfoAfter);
+            assertEquals(permissions, fileInfoAfter.getPermissions());
+            assertEquals(expectedLastModifiedTime, fileInfoAfter.getLastModifiedTime());
+        }
+    }
+
+    @Test(expected = IOException.class)
+    public void testPutWhenFileAlreadyExists() throws IOException {
+        final String permissions = "rw-rw-rw-";
+
+        final Map<PropertyDescriptor, String> properties = createBaseProperties();
+        properties.put(SFTPTransfer.PERMISSIONS, permissions);
+
+        final String fileContent = "this is a test";
+
+        try(final SFTPTransfer transfer = createSFTPTransfer(properties);
+            final InputStream in = new ByteArrayInputStream(fileContent.getBytes(StandardCharsets.UTF_8))) {
+
+            // Verify file already exists
+            final FileInfo fileInfoBefore = transfer.getRemoteFileInfo(null, DIR_2, FILE_1);
+            assertNotNull(fileInfoBefore);
+
+            // Should fail because file already exists
+            transfer.put(null, DIR_2, FILE_1, in);
+        }
+    }
+
+    @Test(expected = IOException.class)
+    public void testPutWhenDirectoryDoesNotExist() throws IOException {
+        final String permissions = "rw-rw-rw-";
+
+        final Map<PropertyDescriptor, String> properties = createBaseProperties();
+        properties.put(SFTPTransfer.PERMISSIONS, permissions);
+
+        final String fileContent = "this is a test";
+
+        try(final SFTPTransfer transfer = createSFTPTransfer(properties);
+            final InputStream in = new ByteArrayInputStream(fileContent.getBytes(StandardCharsets.UTF_8))) {
+            transfer.put(null, "DOES-NOT-EXIST", FILE_1, in);
+        }
+    }
+
+    private Map<PropertyDescriptor, String> createBaseProperties() {
+        final Map<PropertyDescriptor,String> properties = new HashMap<>();
+        properties.put(SFTPTransfer.HOSTNAME, "localhost");
+        properties.put(SFTPTransfer.PORT, Integer.toString(sshTestServer.getSSHPort()));
+        properties.put(SFTPTransfer.USERNAME, sshTestServer.getUsername());
+        properties.put(SFTPTransfer.PASSWORD, sshTestServer.getPassword());
+        properties.put(SFTPTransfer.STRICT_HOST_KEY_CHECKING, "false");
+        return properties;
+    }
+
+    private SFTPTransfer createSFTPTransfer(final Map<PropertyDescriptor, String> properties) {
+        final PropertyContext propertyContext = new MockPropertyContext(properties);
+        final ComponentLog logger = Mockito.mock(ComponentLog.class);
+        return new SFTPTransfer(propertyContext, logger);
+    }
+}
diff --git a/nifi-nar-bundles/nifi-standard-bundle/pom.xml b/nifi-nar-bundles/nifi-standard-bundle/pom.xml
index 59fa62c..64a347d 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/pom.xml
+++ b/nifi-nar-bundles/nifi-standard-bundle/pom.xml
@@ -202,14 +202,9 @@
                 <version>1.12</version>
             </dependency>
             <dependency>
-                <groupId>com.jcraft</groupId>
-                <artifactId>jsch</artifactId>
-                <version>0.1.55</version>
-            </dependency>
-            <dependency>
-                <groupId>com.jcraft</groupId>
-                <artifactId>jzlib</artifactId>
-                <version>1.1.3</version>
+                <groupId>com.hierynomus</groupId>
+                <artifactId>sshj</artifactId>
+                <version>0.27.0</version>
             </dependency>
             <dependency>
                 <groupId>org.apache.httpcomponents</groupId>