You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2022/01/13 16:13:17 UTC

[nifi] branch support/nifi-1.15 updated (6ead245 -> 667cfcd)

This is an automated email from the ASF dual-hosted git repository.

joewitt pushed a change to branch support/nifi-1.15
in repository https://gitbox.apache.org/repos/asf/nifi.git.


    from 6ead245  NIFI-9552: Make sure cl-over-slf4j is included under ext/ranger/install/lib directory
     new 20fa5c9  NIFI-7089 Upgraded Apache SSHD from 1.7.0 to 2.8.0
     new 667cfcd  NIFI-9549: Delegate NonFlushableOutputStream write methods to wrapped OutputStream

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../nifi/stream/io/NonFlushableOutputStream.java   |  15 ++
 .../nifi-standard-processors/pom.xml               |  16 +-
 .../nifi/processors/standard/TestGetSFTP.java      |  12 +-
 .../nifi/processors/standard/TestListSFTP.java     | 224 +++++----------------
 .../nifi/processors/standard/TestPutSFTP.java      |   5 -
 .../processors/standard/util/SSHTestServer.java    |  34 ++--
 nifi-nar-bundles/nifi-standard-bundle/pom.xml      |  10 +
 7 files changed, 100 insertions(+), 216 deletions(-)

[nifi] 01/02: NIFI-7089 Upgraded Apache SSHD from 1.7.0 to 2.8.0

Posted by jo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

joewitt pushed a commit to branch support/nifi-1.15
in repository https://gitbox.apache.org/repos/asf/nifi.git

commit 20fa5c90db5df54b324a827437098dea2d33da0c
Author: exceptionfactory <ex...@apache.org>
AuthorDate: Fri Jan 7 11:49:33 2022 -0600

    NIFI-7089 Upgraded Apache SSHD from 1.7.0 to 2.8.0
    
    - Replaced fake-sftp-server-rule with test Apache SSHD Server
    
    Signed-off-by: Pierre Villard <pi...@gmail.com>
    
    This closes #5646.
---
 .../nifi-standard-processors/pom.xml               |  16 +-
 .../nifi/processors/standard/TestGetSFTP.java      |  12 +-
 .../nifi/processors/standard/TestListSFTP.java     | 224 +++++----------------
 .../nifi/processors/standard/TestPutSFTP.java      |   5 -
 .../processors/standard/util/SSHTestServer.java    |  34 ++--
 nifi-nar-bundles/nifi-standard-bundle/pom.xml      |  10 +
 6 files changed, 85 insertions(+), 216 deletions(-)

diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
index 18859c5..73864e5 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
@@ -329,12 +329,6 @@
             <scope>test</scope>
         </dependency>
         <dependency>
-            <groupId>org.apache.sshd</groupId>
-            <artifactId>sshd-core</artifactId>
-            <version>1.7.0</version>
-            <scope>test</scope>
-        </dependency>
-        <dependency>
             <groupId>com.bazaarvoice.jolt</groupId>
             <artifactId>jolt-core</artifactId>
         </dependency>
@@ -387,9 +381,13 @@
             <scope>test</scope>
         </dependency>
         <dependency>
-            <groupId>com.github.stefanbirkner</groupId>
-            <artifactId>fake-sftp-server-rule</artifactId>
-            <version>2.0.1</version>
+            <groupId>org.apache.sshd</groupId>
+            <artifactId>sshd-core</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.sshd</groupId>
+            <artifactId>sshd-sftp</artifactId>
             <scope>test</scope>
         </dependency>
         <dependency>
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGetSFTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGetSFTP.java
index 5d063c7..79d57cd 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGetSFTP.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGetSFTP.java
@@ -26,8 +26,6 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
@@ -36,8 +34,6 @@ import java.nio.file.Paths;
 
 public class TestGetSFTP {
 
-    private static final Logger logger = LoggerFactory.getLogger(TestGetSFTP.class);
-
     private TestRunner getSFTPRunner;
     private static SSHTestServer sshTestServer;
 
@@ -90,7 +86,7 @@ public class TestGetSFTP {
         //Verify files deleted
         for(int i=1;i<5;i++){
             Path file1 = Paths.get(sshTestServer.getVirtualFileSystemPath() + "/testFile" + i + ".txt");
-            Assert.assertTrue("File not deleted.", !file1.toAbsolutePath().toFile().exists());
+            Assert.assertFalse("File not deleted.", file1.toAbsolutePath().toFile().exists());
         }
 
         getSFTPRunner.clearTransferState();
@@ -114,7 +110,7 @@ public class TestGetSFTP {
             // Verify files deleted
             for (int i = 1; i < 3; i++) {
                 Path file1 = Paths.get(sshTestServer.getVirtualFileSystemPath() + "/testFile" + i + ".txt");
-                Assert.assertTrue("File not deleted.", !file1.toAbsolutePath().toFile().exists());
+                Assert.assertFalse("File not deleted.", file1.toAbsolutePath().toFile().exists());
             }
 
             getSFTPRunner.clearTransferState();
@@ -140,10 +136,10 @@ public class TestGetSFTP {
 
         //Verify non-dotted files were deleted and dotted files were not deleted
         Path file1 = Paths.get(sshTestServer.getVirtualFileSystemPath() + "/testFile1.txt");
-        Assert.assertTrue("File not deleted.", !file1.toAbsolutePath().toFile().exists());
+        Assert.assertFalse("File not deleted.", file1.toAbsolutePath().toFile().exists());
 
         file1 = Paths.get(sshTestServer.getVirtualFileSystemPath() + "/testFile3.txt");
-        Assert.assertTrue("File not deleted.", !file1.toAbsolutePath().toFile().exists());
+        Assert.assertFalse("File not deleted.", file1.toAbsolutePath().toFile().exists());
 
         file1 = Paths.get(sshTestServer.getVirtualFileSystemPath() + "/.testFile2.txt");
         Assert.assertTrue("File deleted.", file1.toAbsolutePath().toFile().exists());
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListSFTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListSFTP.java
index 11e1ff51..18ec637 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListSFTP.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListSFTP.java
@@ -14,193 +14,73 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.nifi.processors.standard;
 
-import com.github.stefanbirkner.fakesftpserver.rule.FakeSftpServerRule;
-
+import java.io.File;
 import java.io.IOException;
+import java.io.UncheckedIOException;
 import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
+import java.nio.file.Files;
 import java.util.Collections;
 import java.util.List;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
+import java.util.UUID;
 
 import org.apache.nifi.components.ConfigVerificationResult;
 import org.apache.nifi.components.ConfigVerificationResult.Outcome;
-import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
 import org.apache.nifi.processor.VerifiableProcessor;
-import org.apache.nifi.processor.util.list.AbstractListProcessor;
 import org.apache.nifi.processors.standard.util.FTPTransfer;
-import org.apache.nifi.processors.standard.util.FileInfo;
-import org.apache.nifi.processors.standard.util.FileTransfer;
 import org.apache.nifi.processors.standard.util.SFTPTransfer;
+import org.apache.nifi.processors.standard.util.SSHTestServer;
 import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
-import org.junit.Rule;
-import java.security.SecureRandom;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.stream.Collectors;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
 
 public class TestListSFTP {
-    @Rule
-    public final FakeSftpServerRule sftpServer = new FakeSftpServerRule();
-    int port;
-
-    final String username = "nifi-sftp-user";
-    final String password = "Test test test chocolate";
-
-    @Before
-    public void setUp() throws Exception {
-        sftpServer.addUser(username, password);
-        port = sftpServer.getPort();
-
-
-        sftpServer.putFile("/directory/smallfile.txt", "byte", StandardCharsets.UTF_8);
+    private static final String REMOTE_DIRECTORY = "/";
 
-        sftpServer.putFile("/directory/file.txt", "a bit more content in this file", StandardCharsets.UTF_8);
-
-        byte[] bytes = new byte[120];
-        SecureRandom.getInstanceStrong().nextBytes(bytes);
-
-        sftpServer.putFile("/directory/file.bin", bytes);
-    }
-
-    @After
-    public void tearDown() throws Exception {
-        sftpServer.deleteAllFilesAndDirectories();
-    }
+    private static final byte[] FILE_CONTENTS = String.class.getName().getBytes(StandardCharsets.UTF_8);
 
-    @Test(timeout = 5000)
-    public void testListingWhileConcurrentlyWritingIntoMultipleDirectories() throws Exception {
-        AtomicInteger fileCounter = new AtomicInteger(1);
+    private TestRunner runner;
 
-        List<String> createdFileNames = new ArrayList<>();
+    private SSHTestServer sshServer;
 
-        CountDownLatch finishScheduledRun = new CountDownLatch(1);
-        CountDownLatch reachScanningSubDir = new CountDownLatch(1);
-        CountDownLatch writeMoreFiles = new CountDownLatch(1);
+    private String tempFileName;
 
-        String baseDir = "/base/";
-        String subDir = "/base/subdir/";
-
-        TestRunner runner = TestRunners.newTestRunner(new ListSFTP() {
-            @Override
-            protected FileTransfer getFileTransfer(ProcessContext context) {
-                return new SFTPTransfer(context, getLogger()){
-                    @Override
-                    protected void getListing(String path, int depth, int maxResults, List<FileInfo> listing, boolean applyFilters) throws IOException {
-                        if (path.contains("subdir")) {
-                            reachScanningSubDir.countDown();
-                            try {
-                                writeMoreFiles.await();
-                            } catch (InterruptedException e) {
-                                throw new RuntimeException(e);
-                            }
-                        }
+    @Before
+    public void setUp() throws Exception {
+        sshServer = new SSHTestServer();
+        sshServer.startServer();
 
-                        super.getListing(path, depth, maxResults, listing, applyFilters);
-                    }
-                };
-            }
-        });
+        writeTempFile();
 
-        // This test fails with BY_TIMESTAMPS
-//        runner.setProperty(AbstractListProcessor.LISTING_STRATEGY, AbstractListProcessor.BY_TIMESTAMPS.getValue());
-        runner.setProperty(AbstractListProcessor.LISTING_STRATEGY, AbstractListProcessor.BY_TIME_WINDOW.getValue());
-        runner.setProperty(ListSFTP.HOSTNAME, "localhost");
-        runner.setProperty(ListSFTP.USERNAME, username);
-        runner.setProperty(SFTPTransfer.PASSWORD, password);
-        runner.setProperty(FTPTransfer.PORT, Integer.toString(port));
-        runner.setProperty(ListSFTP.REMOTE_PATH, baseDir);
-        runner.setProperty(FileTransfer.RECURSIVE_SEARCH, "true");
+        runner = TestRunners.newTestRunner(ListSFTP.class);
+        runner.setProperty(ListSFTP.HOSTNAME, sshServer.getHost());
+        runner.setProperty(ListSFTP.USERNAME, sshServer.getUsername());
+        runner.setProperty(SFTPTransfer.PASSWORD, sshServer.getPassword());
+        runner.setProperty(FTPTransfer.PORT, Integer.toString(sshServer.getSSHPort()));
+        runner.setProperty(ListSFTP.REMOTE_PATH, REMOTE_DIRECTORY);
+        runner.setProperty(ListFile.TARGET_SYSTEM_TIMESTAMP_PRECISION, ListFile.PRECISION_MILLIS);
 
         runner.assertValid();
-
-        ExecutorService executorService = null;
-        try {
-            executorService = Executors.newFixedThreadPool(1);
-            sftpServer.createDirectory("/base");
-
-            uploadFile(baseDir, fileCounter.getAndIncrement(), createdFileNames);
-            uploadFile(subDir, "sub." + fileCounter.getAndIncrement(), createdFileNames);
-
-            executorService.submit(() -> {
-                try {
-                    runner.run(1, false);
-                } finally {
-                    finishScheduledRun.countDown();
-                }
-            });
-
-            reachScanningSubDir.await();
-
-            uploadFile(baseDir, fileCounter.getAndIncrement(), createdFileNames);
-            Thread.sleep(1100); // Make sure the next file has greater timestamp
-            uploadFile(subDir, "sub." + fileCounter.getAndIncrement(), createdFileNames);
-
-            writeMoreFiles.countDown();
-
-            Thread.sleep(1100); // Need to wait for 1+ sec if the file timestamps have only sec precision.
-            finishScheduledRun.await();
-            runner.run();
-
-            List<MockFlowFile> successFiles = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
-
-            List<String> successFileNames = successFiles.stream()
-                .map(MockFlowFile::getAttributes)
-                .map(attributes -> attributes.get("filename"))
-                .sorted()
-                .collect(Collectors.toList());
-
-            Collections.sort(createdFileNames);
-
-            assertEquals(createdFileNames, successFileNames);
-        } finally {
-            if (executorService != null) {
-                executorService.shutdown();
-            }
-        }
+        assertVerificationSuccess();
     }
 
-    private void uploadFile(String baseDir, Object fileSuffix, List<String> createdFileNames) throws Exception {
-        String fileName = "file." + fileSuffix;
-
-        sftpServer.putFile(baseDir + fileName, "unimportant", StandardCharsets.UTF_8);
-
-        createdFileNames.add(fileName);
+    @After
+    public void tearDown() throws Exception {
+        sshServer.stopServer();
     }
 
     @Test
-    public void basicFileList() throws InterruptedException {
-        TestRunner runner = TestRunners.newTestRunner(ListSFTP.class);
-        runner.setProperty(ListSFTP.HOSTNAME, "localhost");
-        runner.setProperty(ListSFTP.USERNAME, username);
-        runner.setProperty(SFTPTransfer.PASSWORD, password);
-        runner.setProperty(FTPTransfer.PORT, Integer.toString(port));
-        runner.setProperty(ListSFTP.REMOTE_PATH, "/directory/");
-
-        runner.setProperty(ListFile.TARGET_SYSTEM_TIMESTAMP_PRECISION, ListFile.PRECISION_MILLIS);
-        runner.assertValid();
-
-        // Ensure wait for enough lag time.
-        Thread.sleep(AbstractListProcessor.LISTING_LAG_MILLIS.get(TimeUnit.MILLISECONDS) * 2);
-
+    public void testRunFileFound() {
         runner.run();
-        assertVerificationOutcome(runner, Outcome.SUCCESSFUL, ".* Found 3 objects.  Of those, 3 match the filter.");
-
-        runner.assertTransferCount(ListSFTP.REL_SUCCESS, 3);
 
+        runner.assertTransferCount(ListSFTP.REL_SUCCESS, 1);
         runner.assertAllFlowFilesContainAttribute("sftp.remote.host");
         runner.assertAllFlowFilesContainAttribute("sftp.remote.port");
         runner.assertAllFlowFilesContainAttribute("sftp.listing.user");
@@ -212,46 +92,34 @@ public class TestListSFTP {
         runner.assertAllFlowFilesContainAttribute( "filename");
 
         final MockFlowFile retrievedFile = runner.getFlowFilesForRelationship(ListSFTP.REL_SUCCESS).get(0);
-        retrievedFile.assertAttributeEquals("sftp.listing.user", username);
+        retrievedFile.assertAttributeEquals("sftp.listing.user", sshServer.getUsername());
+        retrievedFile.assertAttributeEquals(CoreAttributes.FILENAME.key(), tempFileName);
     }
 
-
     @Test
-    public void sizeFilteredFileList() throws InterruptedException {
-        TestRunner runner = TestRunners.newTestRunner(ListSFTP.class);
-        runner.setProperty(ListSFTP.HOSTNAME, "localhost");
-        runner.setProperty(ListSFTP.USERNAME, username);
-        runner.setProperty(SFTPTransfer.PASSWORD, password);
-        runner.setProperty(FTPTransfer.PORT, Integer.toString(port));
-        runner.setProperty(ListSFTP.REMOTE_PATH, "/directory/");
-        runner.setProperty(ListFile.MIN_SIZE, "8B");
-        runner.setProperty(ListFile.MAX_SIZE, "100B");
-
-
-        runner.setProperty(ListFile.TARGET_SYSTEM_TIMESTAMP_PRECISION, ListFile.PRECISION_MILLIS);
-        runner.assertValid();
-
-        // Ensure wait for enough lag time.
-        Thread.sleep(AbstractListProcessor.LISTING_LAG_MILLIS.get(TimeUnit.MILLISECONDS) * 2);
+    public void testRunFileNotFoundMinSizeFiltered() {
+        runner.setProperty(ListFile.MIN_SIZE, "1KB");
 
         runner.run();
 
-        assertVerificationOutcome(runner, Outcome.SUCCESSFUL, ".* Found 3 objects.  Of those, 1 matches the filter.");
-        runner.assertTransferCount(ListSFTP.REL_SUCCESS, 1);
-
-        final MockFlowFile retrievedFile = runner.getFlowFilesForRelationship(ListSFTP.REL_SUCCESS).get(0);
-        //the only file between the limits
-        retrievedFile.assertAttributeEquals("filename", "file.txt");
+        runner.assertTransferCount(ListSFTP.REL_SUCCESS, 0);
     }
 
-    private void assertVerificationOutcome(final TestRunner runner, final Outcome expectedOutcome, final String expectedExplanationRegex) {
+    private void assertVerificationSuccess() {
         final List<ConfigVerificationResult> results = ((VerifiableProcessor) runner.getProcessor())
                 .verify(runner.getProcessContext(), runner.getLogger(), Collections.emptyMap());
-
         assertEquals(1, results.size());
         final ConfigVerificationResult result = results.get(0);
-        assertEquals(expectedOutcome, result.getOutcome());
-        assertTrue(String.format("Expected verification result to match pattern [%s].  Actual explanation was: %s", expectedExplanationRegex, result.getExplanation()),
-                result.getExplanation().matches(expectedExplanationRegex));
+        assertEquals(Outcome.SUCCESSFUL, result.getOutcome());
+    }
+
+    private void writeTempFile() {
+        final File file = new File(sshServer.getVirtualFileSystemPath(), String.format("%s-%s", getClass().getSimpleName(), UUID.randomUUID()));
+        try {
+            Files.write(file.toPath(), FILE_CONTENTS);
+            tempFileName = file.getName();
+        } catch (final IOException e) {
+            throw new UncheckedIOException(e);
+        }
     }
 }
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSFTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSFTP.java
index 36aa2ed..ac5f163 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSFTP.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSFTP.java
@@ -28,8 +28,6 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
@@ -40,9 +38,6 @@ import java.util.HashMap;
 import java.util.Map;
 
 public class TestPutSFTP {
-
-    private static final Logger logger = LoggerFactory.getLogger(TestPutSFTP.class);
-
     private TestRunner putSFTPRunner;
     private static SSHTestServer sshTestServer;
 
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/SSHTestServer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/SSHTestServer.java
index f3d810b..f9747c9 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/SSHTestServer.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/SSHTestServer.java
@@ -17,12 +17,10 @@
 package org.apache.nifi.processors.standard.util;
 
 import org.apache.commons.io.FileUtils;
-import org.apache.sshd.common.NamedFactory;
 import org.apache.sshd.common.file.virtualfs.VirtualFileSystemFactory;
-import org.apache.sshd.server.Command;
 import org.apache.sshd.server.SshServer;
 import org.apache.sshd.server.keyprovider.SimpleGeneratorHostKeyProvider;
-import org.apache.sshd.server.subsystem.sftp.SftpSubsystemFactory;
+import org.apache.sshd.sftp.server.SftpSubsystemFactory;
 
 import java.io.IOException;
 import java.nio.file.Files;
@@ -32,6 +30,16 @@ import java.util.ArrayList;
 import java.util.List;
 
 public class SSHTestServer {
+    private static SshServer sshd;
+
+    private String virtualFileSystemPath = "target/ssh_vfs/";
+
+    private String host = "127.0.0.1";
+
+    private String username = "nifiuser";
+
+    private String password = "nifipassword";
+
     public int getSSHPort(){
         return sshd.getPort();
     }
@@ -60,19 +68,13 @@ public class SSHTestServer {
         this.password = password;
     }
 
-    private static SshServer sshd;
-    private String virtualFileSystemPath = "target/ssh_vfs/";
-
-    private String username = "nifiuser";
-    private String password = "nifipassword";
-
-    public void SSHTestServer(){
-
+    public String getHost() {
+        return host;
     }
 
     public void startServer() throws IOException {
         sshd = SshServer.setUpDefaultServer();
-        sshd.setHost("localhost");
+        sshd.setHost(host);
 
         sshd.setKeyPairProvider(new SimpleGeneratorHostKeyProvider());
 
@@ -88,8 +90,7 @@ public class SSHTestServer {
         Files.createDirectories(dir);
         sshd.setFileSystemFactory(new VirtualFileSystemFactory(dir.toAbsolutePath()));
 
-        //Add SFTP support
-        List<NamedFactory<Command>> sftpCommandFactory = new ArrayList<>();
+        List<SftpSubsystemFactory> sftpCommandFactory = new ArrayList<>();
         sftpCommandFactory.add(new SftpSubsystemFactory());
         sshd.setSubsystemFactories(sftpCommandFactory);
 
@@ -97,10 +98,11 @@ public class SSHTestServer {
     }
 
     public void stopServer() throws IOException {
-        if(sshd == null) return;
+        if (sshd == null) {
+            return;
+        }
         sshd.stop(true);
 
-        //Delete Virtual File System folder
         Path dir = Paths.get(getVirtualFileSystemPath());
         FileUtils.deleteDirectory(dir.toFile());
     }
diff --git a/nifi-nar-bundles/nifi-standard-bundle/pom.xml b/nifi-nar-bundles/nifi-standard-bundle/pom.xml
index a3e9a76..ba9dc4e 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/pom.xml
+++ b/nifi-nar-bundles/nifi-standard-bundle/pom.xml
@@ -392,6 +392,16 @@
                 <artifactId>snakeyaml</artifactId>
                 <version>1.29</version>
             </dependency>
+            <dependency>
+                <groupId>org.apache.sshd</groupId>
+                <artifactId>sshd-core</artifactId>
+                <version>2.8.0</version>
+            </dependency>
+            <dependency>
+                <groupId>org.apache.sshd</groupId>
+                <artifactId>sshd-sftp</artifactId>
+                <version>2.8.0</version>
+            </dependency>
         </dependencies>
     </dependencyManagement>
 </project>

[nifi] 02/02: NIFI-9549: Delegate NonFlushableOutputStream write methods to wrapped OutputStream

Posted by jo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

joewitt pushed a commit to branch support/nifi-1.15
in repository https://gitbox.apache.org/repos/asf/nifi.git

commit 667cfcd780cde9876fe09e43384207a79834f54c
Author: Mark Payne <ma...@hotmail.com>
AuthorDate: Thu Jan 6 16:34:01 2022 -0500

    NIFI-9549: Delegate NonFlushableOutputStream write methods to wrapped OutputStream
    
    Ensure that we delegate calls to write(byte[]) and write(byte[], int, int) to the underlying OutputStream for NonFlushableOutputStream, instead of allowing FilterOutputStream to iterate over every byte
    
    This closes #5642
    
    Signed-off-by: David Handermann <ex...@apache.org>
---
 .../apache/nifi/stream/io/NonFlushableOutputStream.java   | 15 +++++++++++++++
 1 file changed, 15 insertions(+)

diff --git a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/NonFlushableOutputStream.java b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/NonFlushableOutputStream.java
index e951064..335d864 100644
--- a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/NonFlushableOutputStream.java
+++ b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/NonFlushableOutputStream.java
@@ -34,4 +34,19 @@ public class NonFlushableOutputStream extends FilterOutputStream {
     public void close() throws IOException {
         out.close();
     }
+
+    @Override
+    public void write(final byte[] b, final int off, final int len) throws IOException {
+        out.write(b, off, len);
+    }
+
+    @Override
+    public void write(final byte[] b) throws IOException {
+        out.write(b);
+    }
+
+    @Override
+    public void write(final int b) throws IOException {
+        out.write(b);
+    }
 }