You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2018/04/27 15:52:18 UTC
nifi git commit: NIFI-5090: Create move target dir dynamically at
FetchFTP and FetchSFTP
Repository: nifi
Updated Branches:
refs/heads/master 159b64b4c -> 6e8b1c8f7
NIFI-5090: Create move target dir dynamically at FetchFTP and FetchSFTP
Support creating target parent directories even if directory listing is disabled.
fixed typo in doc
Signed-off-by: Matthew Burgess <ma...@apache.org>
This closes #2642
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/6e8b1c8f
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/6e8b1c8f
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/6e8b1c8f
Branch: refs/heads/master
Commit: 6e8b1c8f74b957eaef04d1fa7574c29d9c3acbe0
Parents: 159b64b
Author: Koji Kawamura <ij...@apache.org>
Authored: Wed Apr 18 11:14:41 2018 +0900
Committer: Matthew Burgess <ma...@apache.org>
Committed: Fri Apr 27 11:48:26 2018 -0400
----------------------------------------------------------------------
.../nifi/processors/standard/FetchFTP.java | 6 +-
.../processors/standard/FetchFileTransfer.java | 32 ++-
.../nifi/processors/standard/FetchSFTP.java | 10 +
.../processors/standard/PutFileTransfer.java | 6 +-
.../processors/standard/util/FileTransfer.java | 16 ++
.../processors/standard/util/SFTPTransfer.java | 64 ++---
.../standard/TestFetchFileTransfer.java | 30 ++-
.../standard/util/TestSFTPTransfer.java | 247 +++++++++++++++++++
8 files changed, 361 insertions(+), 50 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/6e8b1c8f/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFTP.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFTP.java
index 6f3f84d..4886274 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFTP.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFTP.java
@@ -29,11 +29,6 @@ import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processors.standard.util.FileTransfer;
-import org.apache.nifi.processors.standard.FetchFileTransfer;
-import org.apache.nifi.processors.standard.GetFTP;
-import org.apache.nifi.processors.standard.GetSFTP;
-import org.apache.nifi.processors.standard.PutFTP;
-import org.apache.nifi.processors.standard.PutSFTP;
import org.apache.nifi.processors.standard.util.FTPTransfer;
// Note that we do not use @SupportsBatching annotation. This processor cannot support batching because it must ensure that session commits happen before remote files are deleted.
@@ -62,6 +57,7 @@ public class FetchFTP extends FetchFileTransfer {
properties.add(REMOTE_FILENAME);
properties.add(COMPLETION_STRATEGY);
properties.add(MOVE_DESTINATION_DIR);
+ properties.add(MOVE_CREATE_DIRECTORY);
properties.add(FTPTransfer.CONNECTION_TIMEOUT);
properties.add(FTPTransfer.DATA_TIMEOUT);
properties.add(FTPTransfer.USE_COMPRESSION);
http://git-wip-us.apache.org/repos/asf/nifi/blob/6e8b1c8f/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFileTransfer.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFileTransfer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFileTransfer.java
index 3ba0066..c44a406 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFileTransfer.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFileTransfer.java
@@ -37,6 +37,7 @@ import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.util.StopWatch;
import org.apache.nifi.util.Tuple;
+import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
@@ -101,11 +102,19 @@ public abstract class FetchFileTransfer extends AbstractProcessor {
.defaultValue(COMPLETION_NONE.getValue())
.required(true)
.build();
+ static final PropertyDescriptor MOVE_CREATE_DIRECTORY = new PropertyDescriptor.Builder()
+ .fromPropertyDescriptor(FileTransfer.CREATE_DIRECTORY).description(String.format("Used when '%s' is '%s'. %s",
+ COMPLETION_STRATEGY.getDisplayName(),
+ COMPLETION_MOVE.getDisplayName(),
+ FileTransfer.CREATE_DIRECTORY.getDescription()))
+ .required(false)
+ .build();
static final PropertyDescriptor MOVE_DESTINATION_DIR = new PropertyDescriptor.Builder()
.name("Move Destination Directory")
- .description("The directory on the remote server to the move the original file to once it has been ingested into NiFi. "
- + "This property is ignored unless the Completion Strategy is set to \"Move File\". The specified directory must already exist on"
- + "the remote system, or the rename will fail.")
+ .description(String.format("The directory on the remote server to move the original file to once it has been ingested into NiFi. "
+ + "This property is ignored unless the %s is set to '%s'. The specified directory must already exist on "
+ + "the remote system if '%s' is disabled, or the rename will fail.",
+ COMPLETION_STRATEGY.getDisplayName(), COMPLETION_MOVE.getDisplayName(), MOVE_CREATE_DIRECTORY.getDisplayName()))
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.required(false)
@@ -189,6 +198,7 @@ public abstract class FetchFileTransfer extends AbstractProcessor {
properties.add(REMOTE_FILENAME);
properties.add(COMPLETION_STRATEGY);
properties.add(MOVE_DESTINATION_DIR);
+ properties.add(MOVE_CREATE_DIRECTORY);
return properties;
}
@@ -308,15 +318,19 @@ public abstract class FetchFileTransfer extends AbstractProcessor {
new Object[]{flowFile, host, port, filename, ioe}, ioe);
}
} else if (COMPLETION_MOVE.getValue().equalsIgnoreCase(completionStrategy)) {
- String targetDir = context.getProperty(MOVE_DESTINATION_DIR).evaluateAttributeExpressions(flowFile).getValue();
- if (!targetDir.endsWith("/")) {
- targetDir = targetDir + "/";
- }
+ final String targetDir = context.getProperty(MOVE_DESTINATION_DIR).evaluateAttributeExpressions(flowFile).getValue();
final String simpleFilename = StringUtils.substringAfterLast(filename, "/");
- final String target = targetDir + simpleFilename;
try {
- transfer.rename(flowFile, filename, target);
+ final String absoluteTargetDirPath = transfer.getAbsolutePath(flowFile, targetDir);
+ final File targetFile = new File(absoluteTargetDirPath, simpleFilename);
+ if (context.getProperty(MOVE_CREATE_DIRECTORY).asBoolean()) {
+ // Create the target directory if necessary.
+ transfer.ensureDirectoryExists(flowFile, targetFile.getParentFile());
+ }
+
+ transfer.rename(flowFile, filename, targetFile.getAbsolutePath());
+
} catch (final IOException ioe) {
getLogger().warn("Successfully fetched the content for {} from {}:{}{} but failed to rename the remote file due to {}",
new Object[]{flowFile, host, port, filename, ioe}, ioe);
http://git-wip-us.apache.org/repos/asf/nifi/blob/6e8b1c8f/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchSFTP.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchSFTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchSFTP.java
index 79eb1e6..6846557 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchSFTP.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchSFTP.java
@@ -49,6 +49,14 @@ public class FetchSFTP extends FetchFileTransfer {
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final PropertyDescriptor port = new PropertyDescriptor.Builder().fromPropertyDescriptor(UNDEFAULTED_PORT).defaultValue("22").build();
+ final PropertyDescriptor disableDirectoryListing = new PropertyDescriptor.Builder()
+ .fromPropertyDescriptor(SFTPTransfer.DISABLE_DIRECTORY_LISTING)
+ .description(String.format("Control how '%s' is created when '%s' is '%s' and '%s' is enabled. %s",
+ MOVE_DESTINATION_DIR.getDisplayName(),
+ COMPLETION_STRATEGY.getDisplayName(),
+ COMPLETION_MOVE.getDisplayName(),
+ MOVE_CREATE_DIRECTORY.getDisplayName(),
+ SFTPTransfer.DISABLE_DIRECTORY_LISTING.getDescription())).build();
final List<PropertyDescriptor> properties = new ArrayList<>();
properties.add(HOSTNAME);
@@ -60,6 +68,8 @@ public class FetchSFTP extends FetchFileTransfer {
properties.add(REMOTE_FILENAME);
properties.add(COMPLETION_STRATEGY);
properties.add(MOVE_DESTINATION_DIR);
+ properties.add(MOVE_CREATE_DIRECTORY);
+ properties.add(disableDirectoryListing);
properties.add(SFTPTransfer.CONNECTION_TIMEOUT);
properties.add(SFTPTransfer.DATA_TIMEOUT);
properties.add(SFTPTransfer.USE_KEEPALIVE_ON_TIMEOUT);
http://git-wip-us.apache.org/repos/asf/nifi/blob/6e8b1c8f/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutFileTransfer.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutFileTransfer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutFileTransfer.java
index cbaa9ec..12eafdb 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutFileTransfer.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutFileTransfer.java
@@ -106,11 +106,7 @@ public abstract class PutFileTransfer<T extends FileTransfer> extends AbstractPr
if (rootPath == null) {
workingDirPath = null;
} else {
- File workingDirectory = new File(rootPath);
- if (!workingDirectory.getPath().startsWith("/") && !workingDirectory.getPath().startsWith("\\")) {
- workingDirectory = new File(transfer.getHomeDirectory(flowFile), workingDirectory.getPath());
- }
- workingDirPath = workingDirectory.getPath().replace("\\", "/");
+ workingDirPath = transfer.getAbsolutePath(flowFile, rootPath);
}
final boolean rejectZeroByteFiles = context.getProperty(FileTransfer.REJECT_ZERO_BYTE).asBoolean();
http://git-wip-us.apache.org/repos/asf/nifi/blob/6e8b1c8f/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FileTransfer.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FileTransfer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FileTransfer.java
index 1163ea8..d500b9d 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FileTransfer.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FileTransfer.java
@@ -58,6 +58,22 @@ public interface FileTransfer extends Closeable {
void ensureDirectoryExists(FlowFile flowFile, File remoteDirectory) throws IOException;
+ /**
+ * Compute an absolute file path for the given remote path.
+ * @param flowFile is used to setup file transfer client with its attribute values, to get user home directory
+ * @param remotePath the target remote path
+ * @return The absolute path for the given remote path
+ */
+ default String getAbsolutePath(FlowFile flowFile, String remotePath) throws IOException {
+ final String absoluteRemotePath;
+ if (!remotePath.startsWith("/") && !remotePath.startsWith("\\")) {
+ absoluteRemotePath = new File(getHomeDirectory(flowFile), remotePath).getPath();
+ } else {
+ absoluteRemotePath = remotePath;
+ }
+ return absoluteRemotePath.replace("\\", "/");
+ }
+
public static final PropertyDescriptor HOSTNAME = new PropertyDescriptor.Builder()
.name("Hostname")
.description("The fully qualified hostname or IP address of the remote system")
http://git-wip-us.apache.org/repos/asf/nifi/blob/6e8b1c8f/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SFTPTransfer.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SFTPTransfer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SFTPTransfer.java
index d7aa6e3..c11a53b 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SFTPTransfer.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SFTPTransfer.java
@@ -106,7 +106,7 @@ public class SFTPTransfer implements FileTransfer {
.description("If set to 'true', directory listing is not performed prior to create missing directories." +
" By default, this processor executes a directory listing command" +
" to see target directory existence before creating missing directories." +
- " However, there are situations that you might need to disable the directory listing such as followings." +
+ " However, there are situations that you might need to disable the directory listing such as the following." +
" Directory listing might fail with some permission setups (e.g. chmod 100) on a directory." +
" Also, if any other SFTP client created the directory after this processor performed a listing" +
" and before a directory creation request by this processor is finished," +
@@ -353,48 +353,52 @@ public class SFTPTransfer implements FileTransfer {
final String remoteDirectory = directoryName.getAbsolutePath().replace("\\", "/").replaceAll("^.\\:", "");
// if we disable the directory listing, we just want to blindly perform the mkdir command,
- // eating any exceptions thrown (like if the directory already exists).
+ // eating failure exceptions thrown (like if the directory already exists).
if (disableDirectoryListing) {
try {
+ // Blindly create the dir.
channel.mkdir(remoteDirectory);
+ // The remote directory did not exist, and was created successfully.
+ return;
} catch (SftpException e) {
- if (e.id != ChannelSftp.SSH_FX_FAILURE) {
+ if (e.id == ChannelSftp.SSH_FX_NO_SUCH_FILE) {
+ // No Such File. This happens when parent directory was not found.
+ logger.debug(String.format("Could not create %s due to 'No such file'. Will try to create the parent dir.", remoteDirectory));
+ } else if (e.id == ChannelSftp.SSH_FX_FAILURE) {
+ // Swallow '4: Failure' including the remote directory already exists.
+ logger.debug("Could not blindly create remote directory due to " + e.getMessage(), e);
+ return;
+ } else {
throw new IOException("Could not blindly create remote directory due to " + e.getMessage(), e);
}
}
- return;
+ } else {
+ try {
+ // Check dir existence.
+ channel.stat(remoteDirectory);
+ // The remote directory already exists.
+ return;
+ } catch (final SftpException e) {
+ if (e.id != ChannelSftp.SSH_FX_NO_SUCH_FILE) {
+ throw new IOException("Failed to determine if remote directory exists at " + remoteDirectory + " due to " + e, e);
+ }
+ }
}
- // end if disableDirectoryListing
- boolean exists;
+ // first ensure parent directories exist before creating this one
+ if (directoryName.getParent() != null && !directoryName.getParentFile().equals(new File(File.separator))) {
+ ensureDirectoryExists(flowFile, directoryName.getParentFile());
+ }
+ logger.debug("Remote Directory {} does not exist; creating it", new Object[] {remoteDirectory});
try {
- channel.stat(remoteDirectory);
- exists = true;
+ channel.mkdir(remoteDirectory);
+ logger.debug("Created {}", new Object[] {remoteDirectory});
} catch (final SftpException e) {
- if (e.id == ChannelSftp.SSH_FX_NO_SUCH_FILE) {
- // No Such File
- exists = false;
- } else {
- throw new IOException("Failed to determine if remote directory exists at " + remoteDirectory + " due to " + e, e);
- }
- }
-
- if (!exists) {
- // first ensure parent directories exist before creating this one
- if (directoryName.getParent() != null && !directoryName.getParentFile().equals(new File(File.separator))) {
- ensureDirectoryExists(flowFile, directoryName.getParentFile());
- }
- logger.debug("Remote Directory {} does not exist; creating it", new Object[] {remoteDirectory});
- try {
- channel.mkdir(remoteDirectory);
- logger.debug("Created {}", new Object[] {remoteDirectory});
- } catch (final SftpException e) {
- throw new IOException("Failed to create remote directory " + remoteDirectory + " due to " + e, e);
- }
+ throw new IOException("Failed to create remote directory " + remoteDirectory + " due to " + e, e);
}
}
- private ChannelSftp getChannel(final FlowFile flowFile) throws IOException {
+ protected ChannelSftp getChannel(final FlowFile flowFile) throws IOException {
if (sftp != null) {
String sessionhost = session.getHost();
String desthost = ctx.getProperty(HOSTNAME).evaluateAttributeExpressions(flowFile).getValue();
@@ -633,7 +637,7 @@ public class SFTPTransfer implements FileTransfer {
} catch (final SftpException e) {
switch (e.id) {
case ChannelSftp.SSH_FX_NO_SUCH_FILE:
- throw new FileNotFoundException();
+ throw new FileNotFoundException("No such file or directory");
case ChannelSftp.SSH_FX_PERMISSION_DENIED:
throw new PermissionDeniedException("Could not rename remote file " + source + " to " + target + " due to insufficient permissions");
default:
http://git-wip-us.apache.org/repos/asf/nifi/blob/6e8b1c8f/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFetchFileTransfer.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFetchFileTransfer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFetchFileTransfer.java
index 4965893..de76b07 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFetchFileTransfer.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFetchFileTransfer.java
@@ -130,6 +130,7 @@ public class TestFetchFileTransfer {
runner.setProperty(FetchFileTransfer.REMOTE_FILENAME, "${filename}");
runner.setProperty(FetchFileTransfer.COMPLETION_STRATEGY, FetchFileTransfer.COMPLETION_MOVE.getValue());
runner.setProperty(FetchFileTransfer.MOVE_DESTINATION_DIR, "/moved");
+ runner.setProperty(FetchFileTransfer.MOVE_CREATE_DIRECTORY, "true");
proc.addContent("hello.txt", "world".getBytes());
final Map<String, String> attrs = new HashMap<>();
@@ -228,10 +229,35 @@ public class TestFetchFileTransfer {
assertTrue(proc.fileContents.containsKey("hello.txt"));
}
+ @Test
+ public void testCreateDirFails() {
+ final TestableFetchFileTransfer proc = new TestableFetchFileTransfer();
+ final TestRunner runner = TestRunners.newTestRunner(proc);
+ runner.setProperty(FetchFileTransfer.HOSTNAME, "localhost");
+ runner.setProperty(FetchFileTransfer.UNDEFAULTED_PORT, "11");
+ runner.setProperty(FetchFileTransfer.REMOTE_FILENAME, "${filename}");
+ runner.setProperty(FetchFileTransfer.COMPLETION_STRATEGY, FetchFileTransfer.COMPLETION_MOVE.getValue());
+ runner.setProperty(FetchFileTransfer.MOVE_DESTINATION_DIR, "/moved/");
+ runner.setProperty(FetchFileTransfer.MOVE_CREATE_DIRECTORY, "true");
+
+ proc.addContent("hello.txt", "world".getBytes());
+ final Map<String, String> attrs = new HashMap<>();
+ attrs.put("filename", "hello.txt");
+ runner.enqueue(new byte[0], attrs);
+ proc.allowCreateDir = false;
+
+ runner.run(1, false, false);
+ runner.assertAllFlowFilesTransferred(FetchFileTransfer.REL_SUCCESS, 1);
+ assertEquals(1, proc.fileContents.size());
+
+ assertTrue(proc.fileContents.containsKey("hello.txt"));
+ }
+
private static class TestableFetchFileTransfer extends FetchFileTransfer {
private boolean allowAccess = true;
private boolean allowDelete = true;
+ private boolean allowCreateDir = true;
private boolean allowRename = true;
private boolean closed = false;
private final Map<String, byte[]> fileContents = new HashMap<>();
@@ -340,7 +366,9 @@ public class TestFetchFileTransfer {
@Override
public void ensureDirectoryExists(FlowFile flowFile, File remoteDirectory) throws IOException {
-
+ if (!allowCreateDir) {
+ throw new PermissionDeniedException("test permission denied");
+ }
}
};
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/6e8b1c8f/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestSFTPTransfer.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestSFTPTransfer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestSFTPTransfer.java
new file mode 100644
index 0000000..9a17838
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestSFTPTransfer.java
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard.util;
+
+import com.jcraft.jsch.ChannelSftp;
+import com.jcraft.jsch.SftpException;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.MockPropertyValue;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static com.jcraft.jsch.ChannelSftp.SSH_FX_FAILURE;
+import static com.jcraft.jsch.ChannelSftp.SSH_FX_NO_SUCH_FILE;
+import static com.jcraft.jsch.ChannelSftp.SSH_FX_PERMISSION_DENIED;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class TestSFTPTransfer {
+
+ private static final Logger logger = LoggerFactory.getLogger(TestSFTPTransfer.class);
+
+ private SFTPTransfer createSftpTransfer(ProcessContext processContext, ChannelSftp channel) {
+ final ComponentLog componentLog = mock(ComponentLog.class);
+ return new SFTPTransfer(processContext, componentLog) {
+ @Override
+ protected ChannelSftp getChannel(FlowFile flowFile) throws IOException {
+ return channel;
+ }
+ };
+ }
+
+ @Test
+ public void testEnsureDirectoryExistsAlreadyExisted() throws IOException, SftpException {
+ final ProcessContext processContext = mock(ProcessContext.class);
+ final ChannelSftp channel = mock(ChannelSftp.class);
+ final SFTPTransfer sftpTransfer = createSftpTransfer(processContext, channel);
+ final MockFlowFile flowFile = new MockFlowFile(0);
+ final File remoteDir = new File("/dir1/dir2/dir3");
+ sftpTransfer.ensureDirectoryExists(flowFile, remoteDir);
+
+ // Dir existence check should be done by stat
+ verify(channel).stat(eq("/dir1/dir2/dir3"));
+ }
+
+ @Test
+ public void testEnsureDirectoryExistsFailedToStat() throws IOException, SftpException {
+ final ProcessContext processContext = mock(ProcessContext.class);
+ final ChannelSftp channel = mock(ChannelSftp.class);
+ // stat for the parent was successful, simulating that dir2 exists, but no dir3.
+ when(channel.stat("/dir1/dir2/dir3")).thenThrow(new SftpException(SSH_FX_FAILURE, "Failure"));
+
+ final SFTPTransfer sftpTransfer = createSftpTransfer(processContext, channel);
+ final MockFlowFile flowFile = new MockFlowFile(0);
+ final File remoteDir = new File("/dir1/dir2/dir3");
+ try {
+ sftpTransfer.ensureDirectoryExists(flowFile, remoteDir);
+ fail("Should fail");
+ } catch (IOException e) {
+ assertEquals("Failed to determine if remote directory exists at /dir1/dir2/dir3 due to 4: Failure", e.getMessage());
+ }
+
+ // Dir existence check should be done by stat
+ verify(channel).stat(eq("/dir1/dir2/dir3"));
+ }
+
+ @Test
+ public void testEnsureDirectoryExistsNotExisted() throws IOException, SftpException {
+ final ProcessContext processContext = mock(ProcessContext.class);
+ final ChannelSftp channel = mock(ChannelSftp.class);
+ // stat for the parent was successful, simulating that dir2 exists, but no dir3.
+ when(channel.stat("/dir1/dir2/dir3")).thenThrow(new SftpException(SSH_FX_NO_SUCH_FILE, "No such file"));
+
+ final SFTPTransfer sftpTransfer = createSftpTransfer(processContext, channel);
+ final MockFlowFile flowFile = new MockFlowFile(0);
+ final File remoteDir = new File("/dir1/dir2/dir3");
+ sftpTransfer.ensureDirectoryExists(flowFile, remoteDir);
+
+ // Dir existence check should be done by stat
+ verify(channel).stat(eq("/dir1/dir2/dir3")); // dir3 was not found
+ verify(channel).stat(eq("/dir1/dir2")); // so, dir2 was checked
+ verify(channel).mkdir(eq("/dir1/dir2/dir3")); // dir2 existed, so dir3 was created.
+ }
+
+ @Test
+ public void testEnsureDirectoryExistsParentNotExisted() throws IOException, SftpException {
+ final ProcessContext processContext = mock(ProcessContext.class);
+ final ChannelSftp channel = mock(ChannelSftp.class);
+ // stat for the dir1 was successful, simulating that dir1 exists, but no dir2 and dir3.
+ when(channel.stat("/dir1/dir2/dir3")).thenThrow(new SftpException(SSH_FX_NO_SUCH_FILE, "No such file"));
+ when(channel.stat("/dir1/dir2")).thenThrow(new SftpException(SSH_FX_NO_SUCH_FILE, "No such file"));
+
+ final SFTPTransfer sftpTransfer = createSftpTransfer(processContext, channel);
+ final MockFlowFile flowFile = new MockFlowFile(0);
+ final File remoteDir = new File("/dir1/dir2/dir3");
+ sftpTransfer.ensureDirectoryExists(flowFile, remoteDir);
+
+ // Dir existence check should be done by stat
+ verify(channel).stat(eq("/dir1/dir2/dir3")); // dir3 was not found
+ verify(channel).stat(eq("/dir1/dir2")); // dir2 was not found, too
+ verify(channel).stat(eq("/dir1")); // dir1 was found
+ verify(channel).mkdir(eq("/dir1/dir2")); // dir1 existed, so dir2 was created.
+ verify(channel).mkdir(eq("/dir1/dir2/dir3")); // then dir3 was created.
+ }
+
+ @Test
+ public void testEnsureDirectoryExistsNotExistedFailedToCreate() throws IOException, SftpException {
+ final ProcessContext processContext = mock(ProcessContext.class);
+ final ChannelSftp channel = mock(ChannelSftp.class);
+ // stat for the parent was successful, simulating that dir2 exists, but no dir3.
+ when(channel.stat("/dir1/dir2/dir3")).thenThrow(new SftpException(SSH_FX_NO_SUCH_FILE, "No such file"));
+ // Failed to create dir3.
+ doThrow(new SftpException(SSH_FX_FAILURE, "Failed")).when(channel).mkdir(eq("/dir1/dir2/dir3"));
+
+ final SFTPTransfer sftpTransfer = createSftpTransfer(processContext, channel);
+ final MockFlowFile flowFile = new MockFlowFile(0);
+ final File remoteDir = new File("/dir1/dir2/dir3");
+ try {
+ sftpTransfer.ensureDirectoryExists(flowFile, remoteDir);
+ fail("Should fail");
+ } catch (IOException e) {
+ assertEquals("Failed to create remote directory /dir1/dir2/dir3 due to 4: Failed", e.getMessage());
+ }
+
+ // Dir existence check should be done by stat
+ verify(channel).stat(eq("/dir1/dir2/dir3")); // dir3 was not found
+ verify(channel).stat(eq("/dir1/dir2")); // so, dir2 was checked
+ verify(channel).mkdir(eq("/dir1/dir2/dir3")); // dir2 existed, so dir3 was created.
+ }
+
+ @Test
+ public void testEnsureDirectoryExistsBlindlyNotExisted() throws IOException, SftpException {
+ final ProcessContext processContext = mock(ProcessContext.class);
+ when(processContext.getProperty(SFTPTransfer.DISABLE_DIRECTORY_LISTING)).thenReturn(new MockPropertyValue("true"));
+
+ final ChannelSftp channel = mock(ChannelSftp.class);
+ final SFTPTransfer sftpTransfer = createSftpTransfer(processContext, channel);
+ final MockFlowFile flowFile = new MockFlowFile(0);
+ final File remoteDir = new File("/dir1/dir2/dir3");
+ sftpTransfer.ensureDirectoryExists(flowFile, remoteDir);
+
+ // stat should not be called.
+ verify(channel, times(0)).stat(eq("/dir1/dir2/dir3"));
+ verify(channel).mkdir(eq("/dir1/dir2/dir3")); // dir3 was created blindly.
+ }
+
+ @Test
+ public void testEnsureDirectoryExistsBlindlyParentNotExisted() throws IOException, SftpException {
+ final ProcessContext processContext = mock(ProcessContext.class);
+ when(processContext.getProperty(SFTPTransfer.DISABLE_DIRECTORY_LISTING)).thenReturn(new MockPropertyValue("true"));
+
+ final ChannelSftp channel = mock(ChannelSftp.class);
+ final AtomicInteger mkdirCount = new AtomicInteger(0);
+ doAnswer(invocation -> {
+ final int cnt = mkdirCount.getAndIncrement();
+ if (cnt == 0) {
+ // If the parent dir does not exist, no such file exception is thrown.
+ throw new SftpException(SSH_FX_NO_SUCH_FILE, "Failure");
+ } else {
+ logger.info("Created the dir successfully for the 2nd time");
+ }
+ return true;
+ }).when(channel).mkdir(eq("/dir1/dir2/dir3"));
+
+ final SFTPTransfer sftpTransfer = createSftpTransfer(processContext, channel);
+ final MockFlowFile flowFile = new MockFlowFile(0);
+ final File remoteDir = new File("/dir1/dir2/dir3");
+ sftpTransfer.ensureDirectoryExists(flowFile, remoteDir);
+
+ // stat should not be called.
+ verify(channel, times(0)).stat(eq("/dir1/dir2/dir3"));
+ // dir3 was created blindly, but failed for the 1st time, and succeeded for the 2nd time.
+ verify(channel, times(2)).mkdir(eq("/dir1/dir2/dir3"));
+ verify(channel).mkdir(eq("/dir1/dir2")); // dir2 was created successfully.
+ }
+
+ @Test
+ public void testEnsureDirectoryExistsBlindlyAlreadyExisted() throws IOException, SftpException {
+ final ProcessContext processContext = mock(ProcessContext.class);
+ when(processContext.getProperty(SFTPTransfer.DISABLE_DIRECTORY_LISTING)).thenReturn(new MockPropertyValue("true"));
+
+ final ChannelSftp channel = mock(ChannelSftp.class);
+ // If the dir existed, a failure exception is thrown, but should be swallowed.
+ doThrow(new SftpException(SSH_FX_FAILURE, "Failure")).when(channel).mkdir(eq("/dir1/dir2/dir3"));
+
+ final SFTPTransfer sftpTransfer = createSftpTransfer(processContext, channel);
+ final MockFlowFile flowFile = new MockFlowFile(0);
+ final File remoteDir = new File("/dir1/dir2/dir3");
+ sftpTransfer.ensureDirectoryExists(flowFile, remoteDir);
+
+ // stat should not be called.
+ verify(channel, times(0)).stat(eq("/dir1/dir2/dir3"));
+ verify(channel).mkdir(eq("/dir1/dir2/dir3")); // dir3 was created blindly.
+ }
+
+ @Test
+ public void testEnsureDirectoryExistsBlindlyFailed() throws IOException, SftpException {
+ final ProcessContext processContext = mock(ProcessContext.class);
+ when(processContext.getProperty(SFTPTransfer.DISABLE_DIRECTORY_LISTING)).thenReturn(new MockPropertyValue("true"));
+
+ final ChannelSftp channel = mock(ChannelSftp.class);
+ doThrow(new SftpException(SSH_FX_PERMISSION_DENIED, "Permission denied")).when(channel).mkdir(eq("/dir1/dir2/dir3"));
+
+ final SFTPTransfer sftpTransfer = createSftpTransfer(processContext, channel);
+ final MockFlowFile flowFile = new MockFlowFile(0);
+ final File remoteDir = new File("/dir1/dir2/dir3");
+ try {
+ sftpTransfer.ensureDirectoryExists(flowFile, remoteDir);
+ fail("Should fail");
+ } catch (IOException e) {
+ assertEquals("Could not blindly create remote directory due to Permission denied", e.getMessage());
+ }
+
+ // stat should not be called.
+ verify(channel, times(0)).stat(eq("/dir1/dir2/dir3"));
+ verify(channel).mkdir(eq("/dir1/dir2/dir3")); // dir3 was created blindly.
+ }
+
+}