You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2020/03/17 12:47:50 UTC

[nifi] 14/47: NIFI-7222 Cleaned up API for FTP/SFTP remote file retrieval and ensure we close remote file resources for SFTP pulls in particular

This is an automated email from the ASF dual-hosted git repository.

joewitt pushed a commit to branch support/nifi-1.11.x
in repository https://gitbox.apache.org/repos/asf/nifi.git

commit 4c78d1e92b77568108c8fe8da9c086040ad41213
Author: Joe Witt <jo...@apache.org>
AuthorDate: Wed Mar 4 23:41:45 2020 -0500

    NIFI-7222 Cleaned up API for FTP/SFTP remote file retrieval and ensure we close remote file resources for SFTP pulls in particular
    
    Signed-off-by: Pierre Villard <pi...@gmail.com>
    
    This closes #4115.
---
 .../processors/standard/FetchFileTransfer.java     | 18 +------
 .../nifi/processors/standard/GetFileTransfer.java  | 10 ++--
 .../nifi/processors/standard/util/FTPTransfer.java | 62 ++++++++++++----------
 .../processors/standard/util/FileTransfer.java     | 10 ++--
 .../processors/standard/util/SFTPTransfer.java     | 57 ++++++++++++--------
 .../processors/standard/TestFetchFileTransfer.java | 31 +++++------
 .../util/ITestSFTPTransferWithSSHTestServer.java   | 23 --------
 7 files changed, 91 insertions(+), 120 deletions(-)

diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFileTransfer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFileTransfer.java
index b7e8f61..b975548 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFileTransfer.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFileTransfer.java
@@ -31,19 +31,15 @@ import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processor.io.OutputStreamCallback;
 import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.processors.standard.util.FileTransfer;
 import org.apache.nifi.processors.standard.util.PermissionDeniedException;
-import org.apache.nifi.stream.io.StreamUtils;
 import org.apache.nifi.util.StopWatch;
 import org.apache.nifi.util.Tuple;
 
 import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -261,20 +257,8 @@ public abstract class FetchFileTransfer extends AbstractProcessor {
         boolean closeConnection = false;
         try {
             // Pull data from remote system.
-            final InputStream in;
             try {
-                in = transfer.getInputStream(filename, flowFile);
-
-                flowFile = session.write(flowFile, new OutputStreamCallback() {
-                    @Override
-                    public void process(final OutputStream out) throws IOException {
-                        StreamUtils.copy(in, out);
-                    }
-                });
-
-                if (!transfer.flush(flowFile)) {
-                    throw new IOException("completePendingCommand returned false, file transfer failed");
-                }
+                flowFile = transfer.getRemoteFile(filename, flowFile, session);
 
             } catch (final FileNotFoundException e) {
                 closeConnection = false;
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetFileTransfer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetFileTransfer.java
index 4ce31de..8ac5a27 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetFileTransfer.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetFileTransfer.java
@@ -18,7 +18,6 @@ package org.apache.nifi.processors.standard;
 
 import java.io.File;
 import java.io.IOException;
-import java.io.InputStream;
 import java.nio.file.Path;
 import java.text.DateFormat;
 import java.text.SimpleDateFormat;
@@ -189,12 +188,9 @@ public abstract class GetFileTransfer extends AbstractProcessor {
                 try {
                     FlowFile flowFile = session.create();
                     final StopWatch stopWatch = new StopWatch(false);
-                    try (final InputStream in = transfer.getInputStream(file.getFullPathFileName())) {
-                        stopWatch.start();
-                        flowFile = session.importFrom(in, flowFile);
-                        stopWatch.stop();
-                    }
-                    transfer.flush();
+                    stopWatch.start();
+                    flowFile = transfer.getRemoteFile(file.getFullPathFileName(), flowFile, session);
+                    stopWatch.stop();
                     final long millis = stopWatch.getDuration(TimeUnit.MILLISECONDS);
                     final String dataRate = stopWatch.calculateDataRate(flowFile.getSize());
                     flowFile = session.putAttribute(flowFile, this.getClass().getSimpleName().toLowerCase() + ".remote.source", hostname);
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FTPTransfer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FTPTransfer.java
index 6a2b2db..e7d96c6 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FTPTransfer.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FTPTransfer.java
@@ -20,6 +20,7 @@ import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.OutputStream;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.Proxy;
@@ -50,10 +51,13 @@ import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.processor.DataUnit;
 import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.OutputStreamCallback;
 import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.proxy.ProxyConfiguration;
 import org.apache.nifi.proxy.ProxySpec;
+import org.apache.nifi.stream.io.StreamUtils;
 
 public class FTPTransfer implements FileTransfer {
 
@@ -314,35 +318,39 @@ public class FTPTransfer implements FileTransfer {
     }
 
     @Override
-    public InputStream getInputStream(String remoteFileName) throws IOException {
-        return getInputStream(remoteFileName, null);
-    }
-
-    @Override
-    public InputStream getInputStream(final String remoteFileName, final FlowFile flowFile) throws IOException {
-        final FTPClient client = getClient(flowFile);
-        InputStream in = client.retrieveFileStream(remoteFileName);
-        if (in == null) {
-            final String response = client.getReplyString();
-            // FTPClient doesn't throw exception if file not found.
-            // Instead, response string will contain: "550 Can't open <absolute_path>: No such file or directory"
-            if (response != null && response.trim().endsWith("No such file or directory")){
-                throw new FileNotFoundException(response);
+    public FlowFile getRemoteFile(final String remoteFileName, final FlowFile origFlowFile, final ProcessSession session) throws ProcessException, IOException {
+        final FTPClient client = getClient(origFlowFile);
+        InputStream in = null;
+        FlowFile resultFlowFile = null;
+        try {
+            in = client.retrieveFileStream(remoteFileName);
+            if (in == null) {
+                final String response = client.getReplyString();
+                // FTPClient doesn't throw exception if file not found.
+                // Instead, response string will contain: "550 Can't open <absolute_path>: No such file or directory"
+                if (response != null && response.trim().endsWith("No such file or directory")) {
+                    throw new FileNotFoundException(response);
+                }
+                throw new IOException(response);
+            }
+            final InputStream remoteIn = in;
+            resultFlowFile = session.write(origFlowFile, new OutputStreamCallback() {
+                @Override
+                public void process(final OutputStream out) throws IOException {
+                    StreamUtils.copy(remoteIn, out);
+                }
+            });
+            client.completePendingCommand();
+            return resultFlowFile;
+        } finally {
+            if(in != null){
+                try{
+                    in.close();
+                }catch(final IOException ioe){
+                    //do nothing
+                }
             }
-            throw new IOException(response);
         }
-        return in;
-    }
-
-    @Override
-    public void flush() throws IOException {
-        final FTPClient client = getClient(null);
-        client.completePendingCommand();
-    }
-
-    @Override
-    public boolean flush(final FlowFile flowFile) throws IOException {
-        return getClient(flowFile).completePendingCommand();
     }
 
     @Override
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FileTransfer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FileTransfer.java
index 56dd22d..64bb130 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FileTransfer.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FileTransfer.java
@@ -26,6 +26,8 @@ import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.Validator;
 import org.apache.nifi.expression.ExpressionLanguageScope;
 import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.util.StandardValidators;
 
 public interface FileTransfer extends Closeable {
@@ -34,13 +36,7 @@ public interface FileTransfer extends Closeable {
 
     List<FileInfo> getListing() throws IOException;
 
-    InputStream getInputStream(String remoteFileName) throws IOException;
-
-    InputStream getInputStream(String remoteFileName, FlowFile flowFile) throws IOException;
-
-    void flush() throws IOException;
-
-    boolean flush(FlowFile flowFile) throws IOException;
+    FlowFile getRemoteFile(String remoteFileName, FlowFile flowFile, ProcessSession session) throws ProcessException, IOException;
 
     FileInfo getRemoteFileInfo(FlowFile flowFile, String path, String remoteFileName) throws IOException;
 
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SFTPTransfer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SFTPTransfer.java
index 465bdde..3a341bc 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SFTPTransfer.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SFTPTransfer.java
@@ -45,18 +45,23 @@ import org.apache.nifi.components.PropertyValue;
 import org.apache.nifi.components.ValidationContext;
 import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.context.PropertyContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.expression.ExpressionLanguageScope;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.io.OutputStreamCallback;
 import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.proxy.ProxyConfiguration;
 import org.apache.nifi.proxy.ProxySpec;
+import org.apache.nifi.stream.io.StreamUtils;
 
 import javax.net.SocketFactory;
 import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.OutputStream;
 import java.net.InetAddress;
 import java.net.Proxy;
 import java.net.Socket;
@@ -346,19 +351,22 @@ public class SFTPTransfer implements FileTransfer {
     }
 
     @Override
-    public InputStream getInputStream(final String remoteFileName) throws IOException {
-        return getInputStream(remoteFileName, null);
-    }
-
-    @Override
-    public InputStream getInputStream(final String remoteFileName, final FlowFile flowFile) throws IOException {
-        final SFTPClient sftpClient = getSFTPClient(flowFile);
+    public FlowFile getRemoteFile(final String remoteFileName, final FlowFile origFlowFile, final ProcessSession session) throws ProcessException, IOException {
+        final SFTPClient sftpClient = getSFTPClient(origFlowFile);
+        RemoteFile rf = null;
+        RemoteFile.ReadAheadRemoteFileInputStream rfis = null;
+        FlowFile resultFlowFile = null;
         try {
-            // The client has 'get' methods for downloading a file, but they don't offer a way to get access to an InputStream so
-            // this code is what the SFTPTransfer Downloader does to get a stream for the remote file contents
-            final RemoteFile rf = sftpClient.open(remoteFileName);
-            final RemoteFile.ReadAheadRemoteFileInputStream rfis = rf.new ReadAheadRemoteFileInputStream(16);
-            return rfis;
+            rf = sftpClient.open(remoteFileName);
+            rfis = rf.new ReadAheadRemoteFileInputStream(16);
+            final InputStream in = rfis;
+            resultFlowFile = session.write(origFlowFile, new OutputStreamCallback() {
+                @Override
+                public void process(final OutputStream out) throws IOException {
+                    StreamUtils.copy(in, out);
+                }
+            });
+            return resultFlowFile;
         } catch (final SFTPException e) {
             switch (e.getStatusCode()) {
                 case NO_SUCH_FILE:
@@ -368,20 +376,25 @@ public class SFTPTransfer implements FileTransfer {
                 default:
                     throw new IOException("Failed to obtain file content for " + remoteFileName, e);
             }
+        } finally {
+            if(rf != null){
+                try{
+                    rf.close();
+                }catch(final IOException ioe){
+                    //do nothing
+                }
+            }
+            if(rfis != null){
+                try{
+                    rfis.close();
+                }catch(final IOException ioe){
+                    //do nothing
+                }
+            }
         }
     }
 
     @Override
-    public void flush() throws IOException {
-        // nothing needed here
-    }
-
-    @Override
-    public boolean flush(final FlowFile flowFile) throws IOException {
-        return true;
-    }
-
-    @Override
     public void deleteFile(final FlowFile flowFile, final String path, final String remoteFileName) throws IOException {
         final SFTPClient sftpClient = getSFTPClient(flowFile);
         final String fullPath = (path == null) ? remoteFileName : (path.endsWith("/")) ? path + remoteFileName : path + "/" + remoteFileName;
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFetchFileTransfer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFetchFileTransfer.java
index de76b07..68eb627 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFetchFileTransfer.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFetchFileTransfer.java
@@ -26,6 +26,7 @@ import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.OutputStream;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -33,9 +34,13 @@ import java.util.Map;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
 import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.OutputStreamCallback;
 import org.apache.nifi.processors.standard.util.FileInfo;
 import org.apache.nifi.processors.standard.util.FileTransfer;
 import org.apache.nifi.processors.standard.util.PermissionDeniedException;
+import org.apache.nifi.stream.io.StreamUtils;
 import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
@@ -285,12 +290,7 @@ public class TestFetchFileTransfer {
                 }
 
                 @Override
-                public InputStream getInputStream(final String remoteFileName) throws IOException {
-                    return getInputStream(remoteFileName, null);
-                }
-
-                @Override
-                public InputStream getInputStream(String remoteFileName, FlowFile flowFile) throws IOException {
+                public FlowFile getRemoteFile(String remoteFileName, FlowFile flowFile, ProcessSession session) throws ProcessException, IOException {
                     if (!allowAccess) {
                         throw new PermissionDeniedException("test permission denied");
                     }
@@ -299,17 +299,14 @@ public class TestFetchFileTransfer {
                     if (content == null) {
                         throw new FileNotFoundException();
                     }
-
-                    return new ByteArrayInputStream(content);
-                }
-
-                @Override
-                public void flush() throws IOException {
-                }
-
-                @Override
-                public boolean flush(FlowFile flowFile) throws IOException {
-                    return true;
+                    final InputStream in = new ByteArrayInputStream(content);
+                    flowFile = session.write(flowFile, new OutputStreamCallback() {
+                        @Override
+                        public void process(final OutputStream out) throws IOException {
+                            StreamUtils.copy(in, out);
+                        }
+                    });
+                    return flowFile;
                 }
 
                 @Override
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/ITestSFTPTransferWithSSHTestServer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/ITestSFTPTransferWithSSHTestServer.java
index 17a3cda..de8c8b6 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/ITestSFTPTransferWithSSHTestServer.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/ITestSFTPTransferWithSSHTestServer.java
@@ -311,29 +311,6 @@ public class ITestSFTPTransferWithSSHTestServer {
     }
 
     @Test
-    public void testGetInputStream() throws IOException {
-        final String filename = "./" + DIR_2 + "/" + FILE_1;
-        final Map<PropertyDescriptor, String> properties = createBaseProperties();
-
-        try(final SFTPTransfer transfer = createSFTPTransfer(properties);
-            final InputStream in = transfer.getInputStream(filename)) {
-            final String content = IOUtils.toString(in, StandardCharsets.UTF_8);
-            assertEquals("dir2 file1", content);
-        }
-    }
-
-    @Test(expected = FileNotFoundException.class)
-    public void testGetInputStreamWhenFileDoesNotExist() throws IOException {
-        final String filename = "./" + DIR_2 + "/DOES-NOT-EXIST";
-        final Map<PropertyDescriptor, String> properties = createBaseProperties();
-
-        try(final SFTPTransfer transfer = createSFTPTransfer(properties);
-            final InputStream in = transfer.getInputStream(filename)) {
-            IOUtils.toString(in, StandardCharsets.UTF_8);
-        }
-    }
-
-    @Test
     public void testDeleteFileWithoutPath() throws IOException {
         final Map<PropertyDescriptor, String> properties = createBaseProperties();
         properties.put(SFTPTransfer.REMOTE_PATH, DIR_2);