You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ij...@apache.org on 2017/08/15 02:25:08 UTC
nifi git commit: NIFI-3281 - fix for (S)FTP processors when using EL
against FFs
Repository: nifi
Updated Branches:
refs/heads/master 50f22162b -> a1706d12f
NIFI-3281 - fix for (S)FTP processors when using EL against FFs
NIFI-3281 - Review - handle completePendingCommand return and added a unit test for ListFTP
NIFI-3281 - Review - Added flow file for EL evaluation in other methods and added unit test for NIFI-3590
This closes #1974.
Signed-off-by: Koji Kawamura <ij...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/a1706d12
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/a1706d12
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/a1706d12
Branch: refs/heads/master
Commit: a1706d12f5bbe824c440a375945dd6038baee4cf
Parents: 50f2216
Author: Pierre Villard <pi...@gmail.com>
Authored: Tue Jul 4 10:48:41 2017 +0200
Committer: Koji Kawamura <ij...@apache.org>
Committed: Tue Aug 15 11:24:48 2017 +0900
----------------------------------------------------------------------
.../processors/standard/FetchFileTransfer.java | 10 ++-
.../processors/standard/GetFileTransfer.java | 2 +-
.../processors/standard/ListFileTransfer.java | 2 +-
.../processors/standard/PutFileTransfer.java | 2 +-
.../processors/standard/util/FTPTransfer.java | 19 ++++--
.../processors/standard/util/FileTransfer.java | 8 ++-
.../processors/standard/util/SFTPTransfer.java | 13 ++--
.../nifi/processors/standard/TestFTP.java | 68 +++++++++++++++++++-
.../standard/TestFetchFileTransfer.java | 11 +++-
9 files changed, 111 insertions(+), 24 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/a1706d12/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFileTransfer.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFileTransfer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFileTransfer.java
index 6182b0a..0023c4b 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFileTransfer.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFileTransfer.java
@@ -239,9 +239,13 @@ public abstract class FetchFileTransfer extends AbstractProcessor {
@Override
public void process(final OutputStream out) throws IOException {
StreamUtils.copy(in, out);
- transfer.flush();
}
});
+
+ if(!transfer.flush(flowFile)) {
+ throw new IOException("completePendingCommand returned false, file transfer failed");
+ }
+
transferQueue.offer(new FileTransferIdleWrapper(transfer, System.nanoTime()));
} catch (final FileNotFoundException e) {
getLogger().error("Failed to fetch content for {} from filename {} on remote host {} because the file could not be found on the remote system; routing to {}",
@@ -297,7 +301,7 @@ public abstract class FetchFileTransfer extends AbstractProcessor {
final String completionStrategy = context.getProperty(COMPLETION_STRATEGY).getValue();
if (COMPLETION_DELETE.getValue().equalsIgnoreCase(completionStrategy)) {
try {
- transfer.deleteFile(null, filename);
+ transfer.deleteFile(flowFile, null, filename);
} catch (final FileNotFoundException e) {
// file doesn't exist -- effectively the same as removing it. Move on.
} catch (final IOException ioe) {
@@ -313,7 +317,7 @@ public abstract class FetchFileTransfer extends AbstractProcessor {
final String target = targetDir + simpleFilename;
try {
- transfer.rename(filename, target);
+ transfer.rename(flowFile, filename, target);
} catch (final IOException ioe) {
getLogger().warn("Successfully fetched the content for {} from {}:{}{} but failed to rename the remote file due to {}",
new Object[] {flowFile, host, port, filename, ioe}, ioe);
http://git-wip-us.apache.org/repos/asf/nifi/blob/a1706d12/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetFileTransfer.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetFileTransfer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetFileTransfer.java
index 00dfccf..4ce31de 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetFileTransfer.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetFileTransfer.java
@@ -208,7 +208,7 @@ public abstract class GetFileTransfer extends AbstractProcessor {
if (deleteOriginal) {
try {
- transfer.deleteFile(null, file.getFullPathFileName());
+ transfer.deleteFile(flowFile, null, file.getFullPathFileName());
} catch (final IOException e) {
logger.error("Failed to remove remote file {} due to {}; deleting local copy",
new Object[]{file.getFullPathFileName(), e});
http://git-wip-us.apache.org/repos/asf/nifi/blob/a1706d12/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFileTransfer.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFileTransfer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFileTransfer.java
index 58e443a..3f35c4e 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFileTransfer.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFileTransfer.java
@@ -94,7 +94,7 @@ public abstract class ListFileTransfer extends AbstractListProcessor<FileInfo> {
@Override
protected String getPath(final ProcessContext context) {
- return context.getProperty(REMOTE_PATH).getValue();
+ return context.getProperty(REMOTE_PATH).evaluateAttributeExpressions().getValue();
}
@Override
http://git-wip-us.apache.org/repos/asf/nifi/blob/a1706d12/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutFileTransfer.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutFileTransfer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutFileTransfer.java
index 054d1d8..cbaa9ec 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutFileTransfer.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutFileTransfer.java
@@ -230,7 +230,7 @@ public abstract class PutFileTransfer<T extends FileTransfer> extends AbstractPr
logger.warn("Resolving conflict by rejecting {} due to conflicting filename with a directory or file already on remote server", new Object[]{flowFile});
break;
case FileTransfer.CONFLICT_RESOLUTION_REPLACE:
- transfer.deleteFile(path, fileName);
+ transfer.deleteFile(flowFile, path, fileName);
destinationRelationship = REL_SUCCESS;
transferFile = true;
penalizeFile = false;
http://git-wip-us.apache.org/repos/asf/nifi/blob/a1706d12/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FTPTransfer.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FTPTransfer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FTPTransfer.java
index b64a6f8..aeec6c3 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FTPTransfer.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FTPTransfer.java
@@ -289,7 +289,7 @@ public class FTPTransfer implements FileTransfer {
@Override
public InputStream getInputStream(final String remoteFileName, final FlowFile flowFile) throws IOException {
- final FTPClient client = getClient(null);
+ final FTPClient client = getClient(flowFile);
InputStream in = client.retrieveFileStream(remoteFileName);
if (in == null) {
throw new IOException(client.getReplyString());
@@ -304,6 +304,11 @@ public class FTPTransfer implements FileTransfer {
}
@Override
+ public boolean flush(final FlowFile flowFile) throws IOException {
+ return getClient(flowFile).completePendingCommand();
+ }
+
+ @Override
public FileInfo getRemoteFileInfo(final FlowFile flowFile, String path, String remoteFileName) throws IOException {
final FTPClient client = getClient(flowFile);
@@ -444,8 +449,8 @@ public class FTPTransfer implements FileTransfer {
@Override
- public void rename(final String source, final String target) throws IOException {
- final FTPClient client = getClient(null);
+ public void rename(final FlowFile flowFile, final String source, final String target) throws IOException {
+ final FTPClient client = getClient(flowFile);
final boolean renameSuccessful = client.rename(source, target);
if (!renameSuccessful) {
throw new IOException("Failed to rename temporary file " + source + " to " + target + " due to: " + client.getReplyString());
@@ -453,8 +458,8 @@ public class FTPTransfer implements FileTransfer {
}
@Override
- public void deleteFile(final String path, final String remoteFileName) throws IOException {
- final FTPClient client = getClient(null);
+ public void deleteFile(final FlowFile flowFile, final String path, final String remoteFileName) throws IOException {
+ final FTPClient client = getClient(flowFile);
if (path != null) {
setWorkingDirectory(path);
}
@@ -464,8 +469,8 @@ public class FTPTransfer implements FileTransfer {
}
@Override
- public void deleteDirectory(final String remoteDirectoryName) throws IOException {
- final FTPClient client = getClient(null);
+ public void deleteDirectory(final FlowFile flowFile, final String remoteDirectoryName) throws IOException {
+ final FTPClient client = getClient(flowFile);
final boolean success = client.removeDirectory(remoteDirectoryName);
if (!success) {
throw new IOException("Failed to remove directory " + remoteDirectoryName + " due to " + client.getReplyString());
http://git-wip-us.apache.org/repos/asf/nifi/blob/a1706d12/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FileTransfer.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FileTransfer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FileTransfer.java
index 22d9ec5..ac7f728 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FileTransfer.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FileTransfer.java
@@ -39,15 +39,17 @@ public interface FileTransfer extends Closeable {
void flush() throws IOException;
+ boolean flush(FlowFile flowFile) throws IOException;
+
FileInfo getRemoteFileInfo(FlowFile flowFile, String path, String remoteFileName) throws IOException;
String put(FlowFile flowFile, String path, String filename, InputStream content) throws IOException;
- void rename(String source, String target) throws IOException;
+ void rename(FlowFile flowFile, String source, String target) throws IOException;
- void deleteFile(String path, String remoteFileName) throws IOException;
+ void deleteFile(FlowFile flowFile, String path, String remoteFileName) throws IOException;
- void deleteDirectory(String remoteDirectoryName) throws IOException;
+ void deleteDirectory(FlowFile flowFile, String remoteDirectoryName) throws IOException;
boolean isClosed();
http://git-wip-us.apache.org/repos/asf/nifi/blob/a1706d12/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SFTPTransfer.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SFTPTransfer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SFTPTransfer.java
index a6a9e4b..bc31ba9 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SFTPTransfer.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SFTPTransfer.java
@@ -309,7 +309,12 @@ public class SFTPTransfer implements FileTransfer {
}
@Override
- public void deleteFile(final String path, final String remoteFileName) throws IOException {
+ public boolean flush(final FlowFile flowFile) throws IOException {
+ return true;
+ }
+
+ @Override
+ public void deleteFile(final FlowFile flowFile, final String path, final String remoteFileName) throws IOException {
final String fullPath = (path == null) ? remoteFileName : (path.endsWith("/")) ? path + remoteFileName : path + "/" + remoteFileName;
try {
sftp.rm(fullPath);
@@ -326,7 +331,7 @@ public class SFTPTransfer implements FileTransfer {
}
@Override
- public void deleteDirectory(final String remoteDirectoryName) throws IOException {
+ public void deleteDirectory(final FlowFile flowFile, final String remoteDirectoryName) throws IOException {
try {
sftp.rm(remoteDirectoryName);
} catch (final SftpException e) {
@@ -613,8 +618,8 @@ public class SFTPTransfer implements FileTransfer {
}
@Override
- public void rename(final String source, final String target) throws IOException {
- final ChannelSftp sftp = getChannel(null);
+ public void rename(final FlowFile flowFile, final String source, final String target) throws IOException {
+ final ChannelSftp sftp = getChannel(flowFile);
try {
sftp.rename(source, target);
} catch (final SftpException e) {
http://git-wip-us.apache.org/repos/asf/nifi/blob/a1706d12/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFTP.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFTP.java
index 102931f..d8797dc 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFTP.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFTP.java
@@ -36,7 +36,6 @@ import org.mockftpserver.fake.filesystem.FileSystem;
import org.mockftpserver.fake.filesystem.WindowsFakeFileSystem;
import java.io.FileInputStream;
-
import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
@@ -167,4 +166,71 @@ public class TestFTP {
final MockFlowFile retrievedFile = runner.getFlowFilesForRelationship(GetFTP.REL_SUCCESS).get(0);
retrievedFile.assertContentEquals("Just some random test test test chocolate");
}
+
+ @Test
+ public void basicFileFetch() throws IOException {
+ FileSystem results = fakeFtpServer.getFileSystem();
+
+ FileEntry sampleFile = new FileEntry("c:\\data\\randombytes-2");
+ sampleFile.setContents("Just some random test test test chocolate");
+ results.add(sampleFile);
+
+ // Check file exists
+ Assert.assertTrue(results.exists("c:\\data\\randombytes-2"));
+
+ TestRunner runner = TestRunners.newTestRunner(FetchFTP.class);
+ runner.setProperty(FetchFTP.HOSTNAME, "${host}");
+ runner.setProperty(FetchFTP.USERNAME, "${username}");
+ runner.setProperty(FTPTransfer.PASSWORD, password);
+ runner.setProperty(FTPTransfer.PORT, "${port}");
+ runner.setProperty(FetchFTP.REMOTE_FILENAME, "c:\\data\\randombytes-2");
+ runner.setProperty(FetchFTP.COMPLETION_STRATEGY, FetchFTP.COMPLETION_MOVE);
+ runner.setProperty(FetchFTP.MOVE_DESTINATION_DIR, "data");
+
+
+ Map<String, String> attrs = new HashMap<String, String>();
+ attrs.put("host", "localhost");
+ attrs.put("username", username);
+ attrs.put("port", Integer.toString(ftpPort));
+ runner.enqueue("", attrs);
+
+ runner.run();
+
+ final MockFlowFile retrievedFile = runner.getFlowFilesForRelationship(FetchFTP.REL_SUCCESS).get(0);
+ retrievedFile.assertContentEquals("Just some random test test test chocolate");
+ }
+
+ @Test
+ public void basicFileList() throws IOException {
+ FileSystem results = fakeFtpServer.getFileSystem();
+
+ FileEntry sampleFile = new FileEntry("c:\\data\\randombytes-2");
+ sampleFile.setContents("Just some random test test test chocolate");
+ results.add(sampleFile);
+
+ // Check file exists
+ Assert.assertTrue(results.exists("c:\\data\\randombytes-2"));
+
+ TestRunner runner = TestRunners.newTestRunner(ListFTP.class);
+ runner.setProperty(ListFTP.HOSTNAME, "localhost");
+ runner.setProperty(ListFTP.USERNAME, username);
+ runner.setProperty(FTPTransfer.PASSWORD, password);
+ runner.setProperty(FTPTransfer.PORT, Integer.toString(ftpPort));
+ runner.setProperty(ListFTP.REMOTE_PATH, "/");
+ runner.assertValid();
+
+ runner.run();
+
+ final MockFlowFile retrievedFile = runner.getFlowFilesForRelationship(FetchFTP.REL_SUCCESS).get(0);
+ runner.assertAllFlowFilesContainAttribute("ftp.remote.host");
+ runner.assertAllFlowFilesContainAttribute("ftp.remote.port");
+ runner.assertAllFlowFilesContainAttribute("ftp.listing.user");
+ runner.assertAllFlowFilesContainAttribute(ListFile.FILE_OWNER_ATTRIBUTE);
+ runner.assertAllFlowFilesContainAttribute(ListFile.FILE_GROUP_ATTRIBUTE);
+ runner.assertAllFlowFilesContainAttribute(ListFile.FILE_PERMISSIONS_ATTRIBUTE);
+ runner.assertAllFlowFilesContainAttribute(ListFile.FILE_SIZE_ATTRIBUTE);
+ runner.assertAllFlowFilesContainAttribute(ListFile.FILE_LAST_MODIFY_TIME_ATTRIBUTE);
+ retrievedFile.assertAttributeEquals("ftp.listing.user", username);
+ retrievedFile.assertAttributeEquals("filename", "randombytes-2");
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/nifi/blob/a1706d12/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFetchFileTransfer.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFetchFileTransfer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFetchFileTransfer.java
index 2b78a4b..4965893 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFetchFileTransfer.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFetchFileTransfer.java
@@ -282,6 +282,11 @@ public class TestFetchFileTransfer {
}
@Override
+ public boolean flush(FlowFile flowFile) throws IOException {
+ return true;
+ }
+
+ @Override
public FileInfo getRemoteFileInfo(FlowFile flowFile, String path, String remoteFileName) throws IOException {
return null;
}
@@ -292,7 +297,7 @@ public class TestFetchFileTransfer {
}
@Override
- public void deleteFile(String path, String remoteFileName) throws IOException {
+ public void deleteFile(FlowFile flowFile, String path, String remoteFileName) throws IOException {
if (!allowDelete) {
throw new PermissionDeniedException("test permission denied");
}
@@ -305,7 +310,7 @@ public class TestFetchFileTransfer {
}
@Override
- public void rename(String source, String target) throws IOException {
+ public void rename(FlowFile flowFile, String source, String target) throws IOException {
if (!allowRename) {
throw new PermissionDeniedException("test permission denied");
}
@@ -319,7 +324,7 @@ public class TestFetchFileTransfer {
}
@Override
- public void deleteDirectory(String remoteDirectoryName) throws IOException {
+ public void deleteDirectory(FlowFile flowFile, String remoteDirectoryName) throws IOException {
}