You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by pv...@apache.org on 2022/01/08 10:02:24 UTC
[nifi] branch main updated: NIFI-7089 Upgraded Apache SSHD from 1.7.0 to 2.8.0
This is an automated email from the ASF dual-hosted git repository.
pvillard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new ca8bc17 NIFI-7089 Upgraded Apache SSHD from 1.7.0 to 2.8.0
ca8bc17 is described below
commit ca8bc17c0aa26a3ffd4ab171e964a60c1518361b
Author: exceptionfactory <ex...@apache.org>
AuthorDate: Fri Jan 7 11:49:33 2022 -0600
NIFI-7089 Upgraded Apache SSHD from 1.7.0 to 2.8.0
- Replaced fake-sftp-server-rule with test Apache SSHD Server
Signed-off-by: Pierre Villard <pi...@gmail.com>
This closes #5646.
---
.../nifi-standard-processors/pom.xml | 16 +-
.../nifi/processors/standard/TestGetSFTP.java | 12 +-
.../nifi/processors/standard/TestListSFTP.java | 224 +++++----------------
.../nifi/processors/standard/TestPutSFTP.java | 5 -
.../processors/standard/util/SSHTestServer.java | 34 ++--
nifi-nar-bundles/nifi-standard-bundle/pom.xml | 10 +
6 files changed, 85 insertions(+), 216 deletions(-)
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
index 22c8362..bb16d87 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
@@ -324,12 +324,6 @@
<scope>test</scope>
</dependency>
<dependency>
- <groupId>org.apache.sshd</groupId>
- <artifactId>sshd-core</artifactId>
- <version>1.7.0</version>
- <scope>test</scope>
- </dependency>
- <dependency>
<groupId>com.bazaarvoice.jolt</groupId>
<artifactId>jolt-core</artifactId>
</dependency>
@@ -382,9 +376,13 @@
<scope>test</scope>
</dependency>
<dependency>
- <groupId>com.github.stefanbirkner</groupId>
- <artifactId>fake-sftp-server-rule</artifactId>
- <version>2.0.1</version>
+ <groupId>org.apache.sshd</groupId>
+ <artifactId>sshd-core</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.sshd</groupId>
+ <artifactId>sshd-sftp</artifactId>
<scope>test</scope>
</dependency>
<dependency>
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGetSFTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGetSFTP.java
index 5d063c7..79d57cd 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGetSFTP.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGetSFTP.java
@@ -26,8 +26,6 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
@@ -36,8 +34,6 @@ import java.nio.file.Paths;
public class TestGetSFTP {
- private static final Logger logger = LoggerFactory.getLogger(TestGetSFTP.class);
-
private TestRunner getSFTPRunner;
private static SSHTestServer sshTestServer;
@@ -90,7 +86,7 @@ public class TestGetSFTP {
//Verify files deleted
for(int i=1;i<5;i++){
Path file1 = Paths.get(sshTestServer.getVirtualFileSystemPath() + "/testFile" + i + ".txt");
- Assert.assertTrue("File not deleted.", !file1.toAbsolutePath().toFile().exists());
+ Assert.assertFalse("File not deleted.", file1.toAbsolutePath().toFile().exists());
}
getSFTPRunner.clearTransferState();
@@ -114,7 +110,7 @@ public class TestGetSFTP {
// Verify files deleted
for (int i = 1; i < 3; i++) {
Path file1 = Paths.get(sshTestServer.getVirtualFileSystemPath() + "/testFile" + i + ".txt");
- Assert.assertTrue("File not deleted.", !file1.toAbsolutePath().toFile().exists());
+ Assert.assertFalse("File not deleted.", file1.toAbsolutePath().toFile().exists());
}
getSFTPRunner.clearTransferState();
@@ -140,10 +136,10 @@ public class TestGetSFTP {
//Verify non-dotted files were deleted and dotted files were not deleted
Path file1 = Paths.get(sshTestServer.getVirtualFileSystemPath() + "/testFile1.txt");
- Assert.assertTrue("File not deleted.", !file1.toAbsolutePath().toFile().exists());
+ Assert.assertFalse("File not deleted.", file1.toAbsolutePath().toFile().exists());
file1 = Paths.get(sshTestServer.getVirtualFileSystemPath() + "/testFile3.txt");
- Assert.assertTrue("File not deleted.", !file1.toAbsolutePath().toFile().exists());
+ Assert.assertFalse("File not deleted.", file1.toAbsolutePath().toFile().exists());
file1 = Paths.get(sshTestServer.getVirtualFileSystemPath() + "/.testFile2.txt");
Assert.assertTrue("File deleted.", file1.toAbsolutePath().toFile().exists());
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListSFTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListSFTP.java
index 11e1ff51..18ec637 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListSFTP.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListSFTP.java
@@ -14,193 +14,73 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.nifi.processors.standard;
-import com.github.stefanbirkner.fakesftpserver.rule.FakeSftpServerRule;
-
+import java.io.File;
import java.io.IOException;
+import java.io.UncheckedIOException;
import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
+import java.nio.file.Files;
import java.util.Collections;
import java.util.List;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
+import java.util.UUID;
import org.apache.nifi.components.ConfigVerificationResult;
import org.apache.nifi.components.ConfigVerificationResult.Outcome;
-import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.VerifiableProcessor;
-import org.apache.nifi.processor.util.list.AbstractListProcessor;
import org.apache.nifi.processors.standard.util.FTPTransfer;
-import org.apache.nifi.processors.standard.util.FileInfo;
-import org.apache.nifi.processors.standard.util.FileTransfer;
import org.apache.nifi.processors.standard.util.SFTPTransfer;
+import org.apache.nifi.processors.standard.util.SSHTestServer;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
-import org.junit.Rule;
-import java.security.SecureRandom;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.stream.Collectors;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
public class TestListSFTP {
- @Rule
- public final FakeSftpServerRule sftpServer = new FakeSftpServerRule();
- int port;
-
- final String username = "nifi-sftp-user";
- final String password = "Test test test chocolate";
-
- @Before
- public void setUp() throws Exception {
- sftpServer.addUser(username, password);
- port = sftpServer.getPort();
-
-
- sftpServer.putFile("/directory/smallfile.txt", "byte", StandardCharsets.UTF_8);
+ private static final String REMOTE_DIRECTORY = "/";
- sftpServer.putFile("/directory/file.txt", "a bit more content in this file", StandardCharsets.UTF_8);
-
- byte[] bytes = new byte[120];
- SecureRandom.getInstanceStrong().nextBytes(bytes);
-
- sftpServer.putFile("/directory/file.bin", bytes);
- }
-
- @After
- public void tearDown() throws Exception {
- sftpServer.deleteAllFilesAndDirectories();
- }
+ private static final byte[] FILE_CONTENTS = String.class.getName().getBytes(StandardCharsets.UTF_8);
- @Test(timeout = 5000)
- public void testListingWhileConcurrentlyWritingIntoMultipleDirectories() throws Exception {
- AtomicInteger fileCounter = new AtomicInteger(1);
+ private TestRunner runner;
- List<String> createdFileNames = new ArrayList<>();
+ private SSHTestServer sshServer;
- CountDownLatch finishScheduledRun = new CountDownLatch(1);
- CountDownLatch reachScanningSubDir = new CountDownLatch(1);
- CountDownLatch writeMoreFiles = new CountDownLatch(1);
+ private String tempFileName;
- String baseDir = "/base/";
- String subDir = "/base/subdir/";
-
- TestRunner runner = TestRunners.newTestRunner(new ListSFTP() {
- @Override
- protected FileTransfer getFileTransfer(ProcessContext context) {
- return new SFTPTransfer(context, getLogger()){
- @Override
- protected void getListing(String path, int depth, int maxResults, List<FileInfo> listing, boolean applyFilters) throws IOException {
- if (path.contains("subdir")) {
- reachScanningSubDir.countDown();
- try {
- writeMoreFiles.await();
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
- }
- }
+ @Before
+ public void setUp() throws Exception {
+ sshServer = new SSHTestServer();
+ sshServer.startServer();
- super.getListing(path, depth, maxResults, listing, applyFilters);
- }
- };
- }
- });
+ writeTempFile();
- // This test fails with BY_TIMESTAMPS
-// runner.setProperty(AbstractListProcessor.LISTING_STRATEGY, AbstractListProcessor.BY_TIMESTAMPS.getValue());
- runner.setProperty(AbstractListProcessor.LISTING_STRATEGY, AbstractListProcessor.BY_TIME_WINDOW.getValue());
- runner.setProperty(ListSFTP.HOSTNAME, "localhost");
- runner.setProperty(ListSFTP.USERNAME, username);
- runner.setProperty(SFTPTransfer.PASSWORD, password);
- runner.setProperty(FTPTransfer.PORT, Integer.toString(port));
- runner.setProperty(ListSFTP.REMOTE_PATH, baseDir);
- runner.setProperty(FileTransfer.RECURSIVE_SEARCH, "true");
+ runner = TestRunners.newTestRunner(ListSFTP.class);
+ runner.setProperty(ListSFTP.HOSTNAME, sshServer.getHost());
+ runner.setProperty(ListSFTP.USERNAME, sshServer.getUsername());
+ runner.setProperty(SFTPTransfer.PASSWORD, sshServer.getPassword());
+ runner.setProperty(FTPTransfer.PORT, Integer.toString(sshServer.getSSHPort()));
+ runner.setProperty(ListSFTP.REMOTE_PATH, REMOTE_DIRECTORY);
+ runner.setProperty(ListFile.TARGET_SYSTEM_TIMESTAMP_PRECISION, ListFile.PRECISION_MILLIS);
runner.assertValid();
-
- ExecutorService executorService = null;
- try {
- executorService = Executors.newFixedThreadPool(1);
- sftpServer.createDirectory("/base");
-
- uploadFile(baseDir, fileCounter.getAndIncrement(), createdFileNames);
- uploadFile(subDir, "sub." + fileCounter.getAndIncrement(), createdFileNames);
-
- executorService.submit(() -> {
- try {
- runner.run(1, false);
- } finally {
- finishScheduledRun.countDown();
- }
- });
-
- reachScanningSubDir.await();
-
- uploadFile(baseDir, fileCounter.getAndIncrement(), createdFileNames);
- Thread.sleep(1100); // Make sure the next file has greater timestamp
- uploadFile(subDir, "sub." + fileCounter.getAndIncrement(), createdFileNames);
-
- writeMoreFiles.countDown();
-
- Thread.sleep(1100); // Need to wait for 1+ sec if the file timestamps have only sec precision.
- finishScheduledRun.await();
- runner.run();
-
- List<MockFlowFile> successFiles = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
-
- List<String> successFileNames = successFiles.stream()
- .map(MockFlowFile::getAttributes)
- .map(attributes -> attributes.get("filename"))
- .sorted()
- .collect(Collectors.toList());
-
- Collections.sort(createdFileNames);
-
- assertEquals(createdFileNames, successFileNames);
- } finally {
- if (executorService != null) {
- executorService.shutdown();
- }
- }
+ assertVerificationSuccess();
}
- private void uploadFile(String baseDir, Object fileSuffix, List<String> createdFileNames) throws Exception {
- String fileName = "file." + fileSuffix;
-
- sftpServer.putFile(baseDir + fileName, "unimportant", StandardCharsets.UTF_8);
-
- createdFileNames.add(fileName);
+ @After
+ public void tearDown() throws Exception {
+ sshServer.stopServer();
}
@Test
- public void basicFileList() throws InterruptedException {
- TestRunner runner = TestRunners.newTestRunner(ListSFTP.class);
- runner.setProperty(ListSFTP.HOSTNAME, "localhost");
- runner.setProperty(ListSFTP.USERNAME, username);
- runner.setProperty(SFTPTransfer.PASSWORD, password);
- runner.setProperty(FTPTransfer.PORT, Integer.toString(port));
- runner.setProperty(ListSFTP.REMOTE_PATH, "/directory/");
-
- runner.setProperty(ListFile.TARGET_SYSTEM_TIMESTAMP_PRECISION, ListFile.PRECISION_MILLIS);
- runner.assertValid();
-
- // Ensure wait for enough lag time.
- Thread.sleep(AbstractListProcessor.LISTING_LAG_MILLIS.get(TimeUnit.MILLISECONDS) * 2);
-
+ public void testRunFileFound() {
runner.run();
- assertVerificationOutcome(runner, Outcome.SUCCESSFUL, ".* Found 3 objects. Of those, 3 match the filter.");
-
- runner.assertTransferCount(ListSFTP.REL_SUCCESS, 3);
+ runner.assertTransferCount(ListSFTP.REL_SUCCESS, 1);
runner.assertAllFlowFilesContainAttribute("sftp.remote.host");
runner.assertAllFlowFilesContainAttribute("sftp.remote.port");
runner.assertAllFlowFilesContainAttribute("sftp.listing.user");
@@ -212,46 +92,34 @@ public class TestListSFTP {
runner.assertAllFlowFilesContainAttribute( "filename");
final MockFlowFile retrievedFile = runner.getFlowFilesForRelationship(ListSFTP.REL_SUCCESS).get(0);
- retrievedFile.assertAttributeEquals("sftp.listing.user", username);
+ retrievedFile.assertAttributeEquals("sftp.listing.user", sshServer.getUsername());
+ retrievedFile.assertAttributeEquals(CoreAttributes.FILENAME.key(), tempFileName);
}
-
@Test
- public void sizeFilteredFileList() throws InterruptedException {
- TestRunner runner = TestRunners.newTestRunner(ListSFTP.class);
- runner.setProperty(ListSFTP.HOSTNAME, "localhost");
- runner.setProperty(ListSFTP.USERNAME, username);
- runner.setProperty(SFTPTransfer.PASSWORD, password);
- runner.setProperty(FTPTransfer.PORT, Integer.toString(port));
- runner.setProperty(ListSFTP.REMOTE_PATH, "/directory/");
- runner.setProperty(ListFile.MIN_SIZE, "8B");
- runner.setProperty(ListFile.MAX_SIZE, "100B");
-
-
- runner.setProperty(ListFile.TARGET_SYSTEM_TIMESTAMP_PRECISION, ListFile.PRECISION_MILLIS);
- runner.assertValid();
-
- // Ensure wait for enough lag time.
- Thread.sleep(AbstractListProcessor.LISTING_LAG_MILLIS.get(TimeUnit.MILLISECONDS) * 2);
+ public void testRunFileNotFoundMinSizeFiltered() {
+ runner.setProperty(ListFile.MIN_SIZE, "1KB");
runner.run();
- assertVerificationOutcome(runner, Outcome.SUCCESSFUL, ".* Found 3 objects. Of those, 1 matches the filter.");
- runner.assertTransferCount(ListSFTP.REL_SUCCESS, 1);
-
- final MockFlowFile retrievedFile = runner.getFlowFilesForRelationship(ListSFTP.REL_SUCCESS).get(0);
- //the only file between the limits
- retrievedFile.assertAttributeEquals("filename", "file.txt");
+ runner.assertTransferCount(ListSFTP.REL_SUCCESS, 0);
}
- private void assertVerificationOutcome(final TestRunner runner, final Outcome expectedOutcome, final String expectedExplanationRegex) {
+ private void assertVerificationSuccess() {
final List<ConfigVerificationResult> results = ((VerifiableProcessor) runner.getProcessor())
.verify(runner.getProcessContext(), runner.getLogger(), Collections.emptyMap());
-
assertEquals(1, results.size());
final ConfigVerificationResult result = results.get(0);
- assertEquals(expectedOutcome, result.getOutcome());
- assertTrue(String.format("Expected verification result to match pattern [%s]. Actual explanation was: %s", expectedExplanationRegex, result.getExplanation()),
- result.getExplanation().matches(expectedExplanationRegex));
+ assertEquals(Outcome.SUCCESSFUL, result.getOutcome());
+ }
+
+ private void writeTempFile() {
+ final File file = new File(sshServer.getVirtualFileSystemPath(), String.format("%s-%s", getClass().getSimpleName(), UUID.randomUUID()));
+ try {
+ Files.write(file.toPath(), FILE_CONTENTS);
+ tempFileName = file.getName();
+ } catch (final IOException e) {
+ throw new UncheckedIOException(e);
+ }
}
}
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSFTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSFTP.java
index 36aa2ed..ac5f163 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSFTP.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSFTP.java
@@ -28,8 +28,6 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
@@ -40,9 +38,6 @@ import java.util.HashMap;
import java.util.Map;
public class TestPutSFTP {
-
- private static final Logger logger = LoggerFactory.getLogger(TestPutSFTP.class);
-
private TestRunner putSFTPRunner;
private static SSHTestServer sshTestServer;
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/SSHTestServer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/SSHTestServer.java
index f3d810b..f9747c9 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/SSHTestServer.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/SSHTestServer.java
@@ -17,12 +17,10 @@
package org.apache.nifi.processors.standard.util;
import org.apache.commons.io.FileUtils;
-import org.apache.sshd.common.NamedFactory;
import org.apache.sshd.common.file.virtualfs.VirtualFileSystemFactory;
-import org.apache.sshd.server.Command;
import org.apache.sshd.server.SshServer;
import org.apache.sshd.server.keyprovider.SimpleGeneratorHostKeyProvider;
-import org.apache.sshd.server.subsystem.sftp.SftpSubsystemFactory;
+import org.apache.sshd.sftp.server.SftpSubsystemFactory;
import java.io.IOException;
import java.nio.file.Files;
@@ -32,6 +30,16 @@ import java.util.ArrayList;
import java.util.List;
public class SSHTestServer {
+ private static SshServer sshd;
+
+ private String virtualFileSystemPath = "target/ssh_vfs/";
+
+ private String host = "127.0.0.1";
+
+ private String username = "nifiuser";
+
+ private String password = "nifipassword";
+
public int getSSHPort(){
return sshd.getPort();
}
@@ -60,19 +68,13 @@ public class SSHTestServer {
this.password = password;
}
- private static SshServer sshd;
- private String virtualFileSystemPath = "target/ssh_vfs/";
-
- private String username = "nifiuser";
- private String password = "nifipassword";
-
- public void SSHTestServer(){
-
+ public String getHost() {
+ return host;
}
public void startServer() throws IOException {
sshd = SshServer.setUpDefaultServer();
- sshd.setHost("localhost");
+ sshd.setHost(host);
sshd.setKeyPairProvider(new SimpleGeneratorHostKeyProvider());
@@ -88,8 +90,7 @@ public class SSHTestServer {
Files.createDirectories(dir);
sshd.setFileSystemFactory(new VirtualFileSystemFactory(dir.toAbsolutePath()));
- //Add SFTP support
- List<NamedFactory<Command>> sftpCommandFactory = new ArrayList<>();
+ List<SftpSubsystemFactory> sftpCommandFactory = new ArrayList<>();
sftpCommandFactory.add(new SftpSubsystemFactory());
sshd.setSubsystemFactories(sftpCommandFactory);
@@ -97,10 +98,11 @@ public class SSHTestServer {
}
public void stopServer() throws IOException {
- if(sshd == null) return;
+ if (sshd == null) {
+ return;
+ }
sshd.stop(true);
- //Delete Virtual File System folder
Path dir = Paths.get(getVirtualFileSystemPath());
FileUtils.deleteDirectory(dir.toFile());
}
diff --git a/nifi-nar-bundles/nifi-standard-bundle/pom.xml b/nifi-nar-bundles/nifi-standard-bundle/pom.xml
index 05d2f23..018cd5b 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/pom.xml
+++ b/nifi-nar-bundles/nifi-standard-bundle/pom.xml
@@ -392,6 +392,16 @@
<artifactId>snakeyaml</artifactId>
<version>1.29</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.sshd</groupId>
+ <artifactId>sshd-core</artifactId>
+ <version>2.8.0</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.sshd</groupId>
+ <artifactId>sshd-sftp</artifactId>
+ <version>2.8.0</version>
+ </dependency>
</dependencies>
</dependencyManagement>
</project>