You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ij...@apache.org on 2017/08/15 02:25:08 UTC

nifi git commit: NIFI-3281 - fix for (S)FTP processors when using EL against FFs

Repository: nifi
Updated Branches:
  refs/heads/master 50f22162b -> a1706d12f


NIFI-3281 - fix for (S)FTP processors when using EL against FFs

NIFI-3281 - Review - handle completePendingCommand return and added a unit test for ListFTP

NIFI-3281 - Review - Added flow file for EL evaluation in other methods and added unit test for NIFI-3590

This closes #1974.

Signed-off-by: Koji Kawamura <ij...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/a1706d12
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/a1706d12
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/a1706d12

Branch: refs/heads/master
Commit: a1706d12f5bbe824c440a375945dd6038baee4cf
Parents: 50f2216
Author: Pierre Villard <pi...@gmail.com>
Authored: Tue Jul 4 10:48:41 2017 +0200
Committer: Koji Kawamura <ij...@apache.org>
Committed: Tue Aug 15 11:24:48 2017 +0900

----------------------------------------------------------------------
 .../processors/standard/FetchFileTransfer.java  | 10 ++-
 .../processors/standard/GetFileTransfer.java    |  2 +-
 .../processors/standard/ListFileTransfer.java   |  2 +-
 .../processors/standard/PutFileTransfer.java    |  2 +-
 .../processors/standard/util/FTPTransfer.java   | 19 ++++--
 .../processors/standard/util/FileTransfer.java  |  8 ++-
 .../processors/standard/util/SFTPTransfer.java  | 13 ++--
 .../nifi/processors/standard/TestFTP.java       | 68 +++++++++++++++++++-
 .../standard/TestFetchFileTransfer.java         | 11 +++-
 9 files changed, 111 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/a1706d12/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFileTransfer.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFileTransfer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFileTransfer.java
index 6182b0a..0023c4b 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFileTransfer.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFileTransfer.java
@@ -239,9 +239,13 @@ public abstract class FetchFileTransfer extends AbstractProcessor {
                 @Override
                 public void process(final OutputStream out) throws IOException {
                     StreamUtils.copy(in, out);
-                    transfer.flush();
                 }
             });
+
+            if(!transfer.flush(flowFile)) {
+                throw new IOException("completePendingCommand returned false, file transfer failed");
+            }
+
             transferQueue.offer(new FileTransferIdleWrapper(transfer, System.nanoTime()));
         } catch (final FileNotFoundException e) {
             getLogger().error("Failed to fetch content for {} from filename {} on remote host {} because the file could not be found on the remote system; routing to {}",
@@ -297,7 +301,7 @@ public abstract class FetchFileTransfer extends AbstractProcessor {
         final String completionStrategy = context.getProperty(COMPLETION_STRATEGY).getValue();
         if (COMPLETION_DELETE.getValue().equalsIgnoreCase(completionStrategy)) {
             try {
-                transfer.deleteFile(null, filename);
+                transfer.deleteFile(flowFile, null, filename);
             } catch (final FileNotFoundException e) {
                 // file doesn't exist -- effectively the same as removing it. Move on.
             } catch (final IOException ioe) {
@@ -313,7 +317,7 @@ public abstract class FetchFileTransfer extends AbstractProcessor {
             final String target = targetDir + simpleFilename;
 
             try {
-                transfer.rename(filename, target);
+                transfer.rename(flowFile, filename, target);
             } catch (final IOException ioe) {
                 getLogger().warn("Successfully fetched the content for {} from {}:{}{} but failed to rename the remote file due to {}",
                     new Object[] {flowFile, host, port, filename, ioe}, ioe);

http://git-wip-us.apache.org/repos/asf/nifi/blob/a1706d12/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetFileTransfer.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetFileTransfer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetFileTransfer.java
index 00dfccf..4ce31de 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetFileTransfer.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetFileTransfer.java
@@ -208,7 +208,7 @@ public abstract class GetFileTransfer extends AbstractProcessor {
 
                     if (deleteOriginal) {
                         try {
-                            transfer.deleteFile(null, file.getFullPathFileName());
+                            transfer.deleteFile(flowFile, null, file.getFullPathFileName());
                         } catch (final IOException e) {
                             logger.error("Failed to remove remote file {} due to {}; deleting local copy",
                                     new Object[]{file.getFullPathFileName(), e});

http://git-wip-us.apache.org/repos/asf/nifi/blob/a1706d12/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFileTransfer.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFileTransfer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFileTransfer.java
index 58e443a..3f35c4e 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFileTransfer.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFileTransfer.java
@@ -94,7 +94,7 @@ public abstract class ListFileTransfer extends AbstractListProcessor<FileInfo> {
 
     @Override
     protected String getPath(final ProcessContext context) {
-        return context.getProperty(REMOTE_PATH).getValue();
+        return context.getProperty(REMOTE_PATH).evaluateAttributeExpressions().getValue();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/nifi/blob/a1706d12/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutFileTransfer.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutFileTransfer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutFileTransfer.java
index 054d1d8..cbaa9ec 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutFileTransfer.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutFileTransfer.java
@@ -230,7 +230,7 @@ public abstract class PutFileTransfer<T extends FileTransfer> extends AbstractPr
                 logger.warn("Resolving conflict by rejecting {} due to conflicting filename with a directory or file already on remote server", new Object[]{flowFile});
                 break;
             case FileTransfer.CONFLICT_RESOLUTION_REPLACE:
-                transfer.deleteFile(path, fileName);
+                transfer.deleteFile(flowFile, path, fileName);
                 destinationRelationship = REL_SUCCESS;
                 transferFile = true;
                 penalizeFile = false;

http://git-wip-us.apache.org/repos/asf/nifi/blob/a1706d12/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FTPTransfer.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FTPTransfer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FTPTransfer.java
index b64a6f8..aeec6c3 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FTPTransfer.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FTPTransfer.java
@@ -289,7 +289,7 @@ public class FTPTransfer implements FileTransfer {
 
     @Override
     public InputStream getInputStream(final String remoteFileName, final FlowFile flowFile) throws IOException {
-        final FTPClient client = getClient(null);
+        final FTPClient client = getClient(flowFile);
         InputStream in = client.retrieveFileStream(remoteFileName);
         if (in == null) {
             throw new IOException(client.getReplyString());
@@ -304,6 +304,11 @@ public class FTPTransfer implements FileTransfer {
     }
 
     @Override
+    public boolean flush(final FlowFile flowFile) throws IOException {
+        return getClient(flowFile).completePendingCommand();
+    }
+
+    @Override
     public FileInfo getRemoteFileInfo(final FlowFile flowFile, String path, String remoteFileName) throws IOException {
         final FTPClient client = getClient(flowFile);
 
@@ -444,8 +449,8 @@ public class FTPTransfer implements FileTransfer {
 
 
     @Override
-    public void rename(final String source, final String target) throws IOException {
-        final FTPClient client = getClient(null);
+    public void rename(final FlowFile flowFile, final String source, final String target) throws IOException {
+        final FTPClient client = getClient(flowFile);
         final boolean renameSuccessful = client.rename(source, target);
         if (!renameSuccessful) {
             throw new IOException("Failed to rename temporary file " + source + " to " + target + " due to: " + client.getReplyString());
@@ -453,8 +458,8 @@ public class FTPTransfer implements FileTransfer {
     }
 
     @Override
-    public void deleteFile(final String path, final String remoteFileName) throws IOException {
-        final FTPClient client = getClient(null);
+    public void deleteFile(final FlowFile flowFile, final String path, final String remoteFileName) throws IOException {
+        final FTPClient client = getClient(flowFile);
         if (path != null) {
             setWorkingDirectory(path);
         }
@@ -464,8 +469,8 @@ public class FTPTransfer implements FileTransfer {
     }
 
     @Override
-    public void deleteDirectory(final String remoteDirectoryName) throws IOException {
-        final FTPClient client = getClient(null);
+    public void deleteDirectory(final FlowFile flowFile, final String remoteDirectoryName) throws IOException {
+        final FTPClient client = getClient(flowFile);
         final boolean success = client.removeDirectory(remoteDirectoryName);
         if (!success) {
             throw new IOException("Failed to remove directory " + remoteDirectoryName + " due to " + client.getReplyString());

http://git-wip-us.apache.org/repos/asf/nifi/blob/a1706d12/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FileTransfer.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FileTransfer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FileTransfer.java
index 22d9ec5..ac7f728 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FileTransfer.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FileTransfer.java
@@ -39,15 +39,17 @@ public interface FileTransfer extends Closeable {
 
     void flush() throws IOException;
 
+    boolean flush(FlowFile flowFile) throws IOException;
+
     FileInfo getRemoteFileInfo(FlowFile flowFile, String path, String remoteFileName) throws IOException;
 
     String put(FlowFile flowFile, String path, String filename, InputStream content) throws IOException;
 
-    void rename(String source, String target) throws IOException;
+    void rename(FlowFile flowFile, String source, String target) throws IOException;
 
-    void deleteFile(String path, String remoteFileName) throws IOException;
+    void deleteFile(FlowFile flowFile, String path, String remoteFileName) throws IOException;
 
-    void deleteDirectory(String remoteDirectoryName) throws IOException;
+    void deleteDirectory(FlowFile flowFile, String remoteDirectoryName) throws IOException;
 
     boolean isClosed();
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/a1706d12/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SFTPTransfer.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SFTPTransfer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SFTPTransfer.java
index a6a9e4b..bc31ba9 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SFTPTransfer.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SFTPTransfer.java
@@ -309,7 +309,12 @@ public class SFTPTransfer implements FileTransfer {
     }
 
     @Override
-    public void deleteFile(final String path, final String remoteFileName) throws IOException {
+    public boolean flush(final FlowFile flowFile) throws IOException {
+        return true;
+    }
+
+    @Override
+    public void deleteFile(final FlowFile flowFile, final String path, final String remoteFileName) throws IOException {
         final String fullPath = (path == null) ? remoteFileName : (path.endsWith("/")) ? path + remoteFileName : path + "/" + remoteFileName;
         try {
             sftp.rm(fullPath);
@@ -326,7 +331,7 @@ public class SFTPTransfer implements FileTransfer {
     }
 
     @Override
-    public void deleteDirectory(final String remoteDirectoryName) throws IOException {
+    public void deleteDirectory(final FlowFile flowFile, final String remoteDirectoryName) throws IOException {
         try {
             sftp.rm(remoteDirectoryName);
         } catch (final SftpException e) {
@@ -613,8 +618,8 @@ public class SFTPTransfer implements FileTransfer {
     }
 
     @Override
-    public void rename(final String source, final String target) throws IOException {
-        final ChannelSftp sftp = getChannel(null);
+    public void rename(final FlowFile flowFile, final String source, final String target) throws IOException {
+        final ChannelSftp sftp = getChannel(flowFile);
         try {
             sftp.rename(source, target);
         } catch (final SftpException e) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/a1706d12/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFTP.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFTP.java
index 102931f..d8797dc 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFTP.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFTP.java
@@ -36,7 +36,6 @@ import org.mockftpserver.fake.filesystem.FileSystem;
 import org.mockftpserver.fake.filesystem.WindowsFakeFileSystem;
 
 import java.io.FileInputStream;
-
 import java.io.IOException;
 import java.util.Collection;
 import java.util.HashMap;
@@ -167,4 +166,71 @@ public class TestFTP {
         final MockFlowFile retrievedFile = runner.getFlowFilesForRelationship(GetFTP.REL_SUCCESS).get(0);
         retrievedFile.assertContentEquals("Just some random test test test chocolate");
     }
+
+    @Test
+    public void basicFileFetch() throws IOException {
+        FileSystem results = fakeFtpServer.getFileSystem();
+
+        FileEntry sampleFile = new FileEntry("c:\\data\\randombytes-2");
+        sampleFile.setContents("Just some random test test test chocolate");
+        results.add(sampleFile);
+
+        // Check file exists
+        Assert.assertTrue(results.exists("c:\\data\\randombytes-2"));
+
+        TestRunner runner = TestRunners.newTestRunner(FetchFTP.class);
+        runner.setProperty(FetchFTP.HOSTNAME, "${host}");
+        runner.setProperty(FetchFTP.USERNAME, "${username}");
+        runner.setProperty(FTPTransfer.PASSWORD, password);
+        runner.setProperty(FTPTransfer.PORT, "${port}");
+        runner.setProperty(FetchFTP.REMOTE_FILENAME, "c:\\data\\randombytes-2");
+        runner.setProperty(FetchFTP.COMPLETION_STRATEGY, FetchFTP.COMPLETION_MOVE);
+        runner.setProperty(FetchFTP.MOVE_DESTINATION_DIR, "data");
+
+
+        Map<String, String> attrs = new HashMap<String, String>();
+        attrs.put("host", "localhost");
+        attrs.put("username", username);
+        attrs.put("port", Integer.toString(ftpPort));
+        runner.enqueue("", attrs);
+
+        runner.run();
+
+        final MockFlowFile retrievedFile = runner.getFlowFilesForRelationship(FetchFTP.REL_SUCCESS).get(0);
+        retrievedFile.assertContentEquals("Just some random test test test chocolate");
+    }
+
+    @Test
+    public void basicFileList() throws IOException {
+        FileSystem results = fakeFtpServer.getFileSystem();
+
+        FileEntry sampleFile = new FileEntry("c:\\data\\randombytes-2");
+        sampleFile.setContents("Just some random test test test chocolate");
+        results.add(sampleFile);
+
+        // Check file exists
+        Assert.assertTrue(results.exists("c:\\data\\randombytes-2"));
+
+        TestRunner runner = TestRunners.newTestRunner(ListFTP.class);
+        runner.setProperty(ListFTP.HOSTNAME, "localhost");
+        runner.setProperty(ListFTP.USERNAME, username);
+        runner.setProperty(FTPTransfer.PASSWORD, password);
+        runner.setProperty(FTPTransfer.PORT, Integer.toString(ftpPort));
+        runner.setProperty(ListFTP.REMOTE_PATH, "/");
+        runner.assertValid();
+
+        runner.run();
+
+        final MockFlowFile retrievedFile = runner.getFlowFilesForRelationship(FetchFTP.REL_SUCCESS).get(0);
+        runner.assertAllFlowFilesContainAttribute("ftp.remote.host");
+        runner.assertAllFlowFilesContainAttribute("ftp.remote.port");
+        runner.assertAllFlowFilesContainAttribute("ftp.listing.user");
+        runner.assertAllFlowFilesContainAttribute(ListFile.FILE_OWNER_ATTRIBUTE);
+        runner.assertAllFlowFilesContainAttribute(ListFile.FILE_GROUP_ATTRIBUTE);
+        runner.assertAllFlowFilesContainAttribute(ListFile.FILE_PERMISSIONS_ATTRIBUTE);
+        runner.assertAllFlowFilesContainAttribute(ListFile.FILE_SIZE_ATTRIBUTE);
+        runner.assertAllFlowFilesContainAttribute(ListFile.FILE_LAST_MODIFY_TIME_ATTRIBUTE);
+        retrievedFile.assertAttributeEquals("ftp.listing.user", username);
+        retrievedFile.assertAttributeEquals("filename", "randombytes-2");
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/a1706d12/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFetchFileTransfer.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFetchFileTransfer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFetchFileTransfer.java
index 2b78a4b..4965893 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFetchFileTransfer.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFetchFileTransfer.java
@@ -282,6 +282,11 @@ public class TestFetchFileTransfer {
                 }
 
                 @Override
+                public boolean flush(FlowFile flowFile) throws IOException {
+                    return true;
+                }
+
+                @Override
                 public FileInfo getRemoteFileInfo(FlowFile flowFile, String path, String remoteFileName) throws IOException {
                     return null;
                 }
@@ -292,7 +297,7 @@ public class TestFetchFileTransfer {
                 }
 
                 @Override
-                public void deleteFile(String path, String remoteFileName) throws IOException {
+                public void deleteFile(FlowFile flowFile, String path, String remoteFileName) throws IOException {
                     if (!allowDelete) {
                         throw new PermissionDeniedException("test permission denied");
                     }
@@ -305,7 +310,7 @@ public class TestFetchFileTransfer {
                 }
 
                 @Override
-                public void rename(String source, String target) throws IOException {
+                public void rename(FlowFile flowFile, String source, String target) throws IOException {
                     if (!allowRename) {
                         throw new PermissionDeniedException("test permission denied");
                     }
@@ -319,7 +324,7 @@ public class TestFetchFileTransfer {
                 }
 
                 @Override
-                public void deleteDirectory(String remoteDirectoryName) throws IOException {
+                public void deleteDirectory(FlowFile flowFile, String remoteDirectoryName) throws IOException {
 
                 }