You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2022/04/21 19:29:24 UTC

[nifi] branch main updated: NIFI-9949 This closes #5988. Corrected intermittent failures in TestPutSFTP

This is an automated email from the ASF dual-hosted git repository.

joewitt pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 4b7f8cbe1f NIFI-9949 This closes #5988. Corrected intermittent failures in TestPutSFTP
4b7f8cbe1f is described below

commit 4b7f8cbe1f860dc99d72a86ea3c211c86b2f7aa8
Author: exceptionfactory <ex...@apache.org>
AuthorDate: Thu Apr 21 13:41:08 2022 -0500

    NIFI-9949 This closes #5988. Corrected intermittent failures in TestPutSFTP
    
    - Changed SSH server to start and stop after each method
    - Replaced queued file with string FlowFile contents
    - Refactored TestPutSFTP using JUnit 5
    
    Signed-off-by: Joe Witt <jo...@apache.org>
---
 .../nifi/processors/standard/TestPutSFTP.java      | 319 +++++++++------------
 1 file changed, 137 insertions(+), 182 deletions(-)

diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSFTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSFTP.java
index ac5f163720..c5ff8f327d 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSFTP.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSFTP.java
@@ -17,278 +17,233 @@
 package org.apache.nifi.processors.standard;
 
 import org.apache.commons.io.FileUtils;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
 import org.apache.nifi.processors.standard.util.FileTransfer;
 import org.apache.nifi.processors.standard.util.SFTPTransfer;
 import org.apache.nifi.processors.standard.util.SSHTestServer;
 import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
 
-import java.io.File;
 import java.io.IOException;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 
-public class TestPutSFTP {
-    private TestRunner putSFTPRunner;
-    private static SSHTestServer sshTestServer;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
-    private final String testFile = "src" + File.separator + "test" + File.separator + "resources" + File.separator + "hello.txt";
+class TestPutSFTP {
+    private static final String FLOW_FILE_CONTENTS = TestPutSFTP.class.getSimpleName();
 
-    @BeforeClass
-    public static void setupSSHD() throws IOException {
+    private SSHTestServer sshTestServer;
+
+    private TestRunner runner;
+
+    @BeforeEach
+    void setRunner() throws IOException {
         sshTestServer = new SSHTestServer();
         sshTestServer.startServer();
-    }
 
-    @AfterClass
-    public static void cleanupSSHD() throws IOException {
-        sshTestServer.stopServer();
+        runner = TestRunners.newTestRunner(PutSFTP.class);
+        runner.setProperty(SFTPTransfer.HOSTNAME, "localhost");
+        runner.setProperty(SFTPTransfer.PORT, Integer.toString(sshTestServer.getSSHPort()));
+        runner.setProperty(SFTPTransfer.USERNAME, sshTestServer.getUsername());
+        runner.setProperty(SFTPTransfer.PASSWORD, sshTestServer.getPassword());
+        runner.setProperty(SFTPTransfer.STRICT_HOST_KEY_CHECKING, "false");
+        runner.setProperty(SFTPTransfer.BATCH_SIZE, "2");
+        runner.setProperty(SFTPTransfer.REMOTE_PATH, "nifi_test/");
+        runner.setProperty(SFTPTransfer.REJECT_ZERO_BYTE, "true");
+        runner.setProperty(SFTPTransfer.CONFLICT_RESOLUTION, FileTransfer.CONFLICT_RESOLUTION_REPLACE);
+        runner.setProperty(SFTPTransfer.CREATE_DIRECTORY, "true");
+        runner.setProperty(SFTPTransfer.DATA_TIMEOUT, "30 sec");
+        runner.setValidateExpressionUsage(false);
     }
 
-    @Before
-    public void setup(){
-        putSFTPRunner = TestRunners.newTestRunner(PutSFTP.class);
-        putSFTPRunner.setProperty(SFTPTransfer.HOSTNAME, "localhost");
-        putSFTPRunner.setProperty(SFTPTransfer.PORT, Integer.toString(sshTestServer.getSSHPort()));
-        putSFTPRunner.setProperty(SFTPTransfer.USERNAME, sshTestServer.getUsername());
-        putSFTPRunner.setProperty(SFTPTransfer.PASSWORD, sshTestServer.getPassword());
-        putSFTPRunner.setProperty(SFTPTransfer.STRICT_HOST_KEY_CHECKING, "false");
-        putSFTPRunner.setProperty(SFTPTransfer.BATCH_SIZE, "2");
-        putSFTPRunner.setProperty(SFTPTransfer.REMOTE_PATH, "nifi_test/");
-        putSFTPRunner.setProperty(SFTPTransfer.REJECT_ZERO_BYTE, "true");
-        putSFTPRunner.setProperty(SFTPTransfer.CONFLICT_RESOLUTION, FileTransfer.CONFLICT_RESOLUTION_REPLACE);
-        putSFTPRunner.setProperty(SFTPTransfer.CREATE_DIRECTORY, "true");
-        putSFTPRunner.setProperty(SFTPTransfer.DATA_TIMEOUT, "30 sec");
-        putSFTPRunner.setValidateExpressionUsage(false);
+    @AfterEach
+    void clearDirectory() throws IOException {
+        sshTestServer.stopServer();
+        final Path fileSystemPath = Paths.get(sshTestServer.getVirtualFileSystemPath());
+        FileUtils.deleteQuietly(fileSystemPath.toFile());
     }
 
     @Test
-    public void testPutSFTPFile() throws IOException {
-        emptyTestDirectory();
+    void testRunNewDirectory() {
+        runner.enqueue(FLOW_FILE_CONTENTS, Collections.singletonMap(CoreAttributes.FILENAME.key(), "1.txt"));
+        runner.run();
 
-        Map<String,String> attributes = new HashMap<>();
-        attributes.put("filename", "testfile.txt");
-
-        putSFTPRunner.enqueue(Paths.get(testFile), attributes);
-        putSFTPRunner.run();
-
-        putSFTPRunner.assertTransferCount(PutSFTP.REL_SUCCESS, 1);
+        runner.assertTransferCount(PutSFTP.REL_SUCCESS, 1);
 
         //verify directory exists
         Path newDirectory = Paths.get(sshTestServer.getVirtualFileSystemPath() + "nifi_test/");
-        Path newFile = Paths.get(sshTestServer.getVirtualFileSystemPath() + "nifi_test/testfile.txt");
-        Assert.assertTrue("New directory not created.", newDirectory.toAbsolutePath().toFile().exists());
-        Assert.assertTrue("New File not created.", newFile.toAbsolutePath().toFile().exists());
-        putSFTPRunner.clearTransferState();
+        Path newFile = Paths.get(sshTestServer.getVirtualFileSystemPath() + "nifi_test/1.txt");
+        assertTrue(newDirectory.toAbsolutePath().toFile().exists(), "New Directory not created");
+        assertTrue(newFile.toAbsolutePath().toFile().exists(), "New File not created");
+        runner.clearTransferState();
     }
 
     @Test
-    public void testPutSFTPFileZeroByte() throws IOException {
-        emptyTestDirectory();
+    void testRunZeroByteFile() {
+        runner.enqueue(FLOW_FILE_CONTENTS, Collections.singletonMap(CoreAttributes.FILENAME.key(), "1.txt"));
+        runner.enqueue("", Collections.singletonMap(CoreAttributes.FILENAME.key(), "2.txt"));
 
-        Map<String,String> attributes = new HashMap<>();
-        attributes.put("filename", "testfile.txt");
-        putSFTPRunner.enqueue(Paths.get(testFile), attributes);
-
-        attributes = new HashMap<>();
-        attributes.put("filename", "testfile1.txt");
-        putSFTPRunner.enqueue("", attributes);
-
-        putSFTPRunner.run();
+        runner.run();
 
         //Two files in batch, should have only 1 transferred to sucess, 1 to failure
-        putSFTPRunner.assertTransferCount(PutSFTP.REL_SUCCESS, 1);
-        putSFTPRunner.assertTransferCount(PutSFTP.REL_REJECT, 1);
-        putSFTPRunner.clearTransferState();
+        runner.assertTransferCount(PutSFTP.REL_SUCCESS, 1);
+        runner.assertTransferCount(PutSFTP.REL_REJECT, 1);
+        runner.clearTransferState();
 
-        attributes = new HashMap<>();
-        attributes.put("filename", "testfile1.txt");
-        putSFTPRunner.enqueue("", attributes);
+        runner.enqueue("", Collections.singletonMap(CoreAttributes.FILENAME.key(), "1.txt"));
 
-        putSFTPRunner.run();
+        runner.run();
 
         //One files in batch, should have 0 transferred to output since it's zero byte
-        putSFTPRunner.assertTransferCount(PutSFTP.REL_REJECT, 1);
-        putSFTPRunner.assertTransferCount(PutSFTP.REL_SUCCESS, 0);
-        putSFTPRunner.clearTransferState();
+        runner.assertTransferCount(PutSFTP.REL_REJECT, 1);
+        runner.assertTransferCount(PutSFTP.REL_SUCCESS, 0);
+        runner.clearTransferState();
 
         //allow zero byte files
-        putSFTPRunner.setProperty(SFTPTransfer.REJECT_ZERO_BYTE, "false");
+        runner.setProperty(SFTPTransfer.REJECT_ZERO_BYTE, "false");
 
-        attributes = new HashMap<>();
-        attributes.put("filename", "testfile1.txt");
-        putSFTPRunner.enqueue("", attributes);
+        runner.enqueue("", Collections.singletonMap(CoreAttributes.FILENAME.key(), "1.txt"));
 
-        putSFTPRunner.run();
+        runner.run();
 
         //should have 1 transferred to sucess
-        putSFTPRunner.assertTransferCount(PutSFTP.REL_SUCCESS, 1);
+        runner.assertTransferCount(PutSFTP.REL_SUCCESS, 1);
 
         //revert settings
-        putSFTPRunner.setProperty(SFTPTransfer.REJECT_ZERO_BYTE, "true");
-        putSFTPRunner.clearTransferState();
+        runner.setProperty(SFTPTransfer.REJECT_ZERO_BYTE, "true");
+        runner.clearTransferState();
     }
 
     @Test
-    public void testPutSFTPFileConflictResolution() throws IOException {
-        emptyTestDirectory();
+    void testRunConflictResolution() throws IOException {
+        final String directoryName = "nifi_test";
+        final String filename = "1";
 
-        //Try transferring file with the same name as a directory, should fail in all cases
-        // except RESOLUTION of NONE
-        Path dir = Paths.get(sshTestServer.getVirtualFileSystemPath() + "nifi_test" );
-        Path dir2 = Paths.get(sshTestServer.getVirtualFileSystemPath() + "nifi_test/testfile" );
-        Files.createDirectory(dir);
-        Files.createDirectory(dir2);
+        final Path directory = Paths.get(sshTestServer.getVirtualFileSystemPath() + directoryName );
+        final Path subDirectory = Paths.get(directory.toString(), filename);
+        Files.createDirectory(directory);
+        Files.createFile(subDirectory);
 
-        Map<String,String> attributes = new HashMap<>();
-        attributes.put("filename", "testfile");
-
-        putSFTPRunner.enqueue(Paths.get(testFile), attributes);
-        putSFTPRunner.run();
-
-        putSFTPRunner.assertTransferCount(PutSFTP.REL_SUCCESS, 0);
-        putSFTPRunner.assertTransferCount(PutSFTP.REL_FAILURE, 1);
+        final Map<String, String> flowFileAttributes = Collections.singletonMap(CoreAttributes.FILENAME.key(), filename);
 
-        //Prepare by uploading test file
-        attributes = new HashMap<>();
-        attributes.put("filename", "testfile.txt");
+        // REPLACE Strategy
+        runner.setProperty(SFTPTransfer.CONFLICT_RESOLUTION, FileTransfer.CONFLICT_RESOLUTION_REPLACE);
 
-        putSFTPRunner.setProperty(SFTPTransfer.CONFLICT_RESOLUTION, FileTransfer.CONFLICT_RESOLUTION_REPLACE);
+        runner.enqueue(FLOW_FILE_CONTENTS, flowFileAttributes);
+        runner.run();
 
-        putSFTPRunner.enqueue(Paths.get(testFile), attributes);
-        putSFTPRunner.run();
-        putSFTPRunner.clearTransferState();
+        runner.assertTransferCount(PutSFTP.REL_SUCCESS, 1);
+        runner.assertTransferCount(PutSFTP.REL_REJECT, 0);
+        runner.assertTransferCount(PutSFTP.REL_FAILURE, 0);
+        runner.clearTransferState();
 
-        //set conflict resolution mode to REJECT
-        putSFTPRunner.setProperty(SFTPTransfer.CONFLICT_RESOLUTION, FileTransfer.CONFLICT_RESOLUTION_REJECT);
+        // REJECT Strategy
+        runner.setProperty(SFTPTransfer.CONFLICT_RESOLUTION, FileTransfer.CONFLICT_RESOLUTION_REJECT);
 
-        putSFTPRunner.enqueue(Paths.get(testFile), attributes);
-        putSFTPRunner.run();
+        runner.enqueue(FLOW_FILE_CONTENTS, flowFileAttributes);
+        runner.run();
 
-        putSFTPRunner.assertTransferCount(PutSFTP.REL_SUCCESS, 0);
-        putSFTPRunner.assertTransferCount(PutSFTP.REL_REJECT, 1);
-        putSFTPRunner.clearTransferState();
+        runner.assertTransferCount(PutSFTP.REL_SUCCESS, 0);
+        runner.assertTransferCount(PutSFTP.REL_REJECT, 1);
+        runner.assertTransferCount(PutSFTP.REL_FAILURE, 0);
+        runner.clearTransferState();
 
-        //set conflict resolution mode to IGNORE
-        putSFTPRunner.setProperty(SFTPTransfer.CONFLICT_RESOLUTION, FileTransfer.CONFLICT_RESOLUTION_IGNORE);
-        putSFTPRunner.enqueue(Paths.get(testFile), attributes);
-        putSFTPRunner.run();
+        // IGNORE Strategy
+        runner.setProperty(SFTPTransfer.CONFLICT_RESOLUTION, FileTransfer.CONFLICT_RESOLUTION_IGNORE);
+        runner.enqueue(FLOW_FILE_CONTENTS, flowFileAttributes);
+        runner.run();
 
-        putSFTPRunner.assertTransferCount(PutSFTP.REL_SUCCESS, 1);
-        putSFTPRunner.assertTransferCount(PutSFTP.REL_REJECT, 0);
+        runner.assertTransferCount(PutSFTP.REL_SUCCESS, 1);
+        runner.assertTransferCount(PutSFTP.REL_REJECT, 0);
+        runner.assertTransferCount(PutSFTP.REL_FAILURE, 0);
 
-        putSFTPRunner.clearTransferState();
+        runner.clearTransferState();
 
-        //set conflict resolution mode to FAIL
-        putSFTPRunner.setProperty(SFTPTransfer.CONFLICT_RESOLUTION, FileTransfer.CONFLICT_RESOLUTION_FAIL);
-        putSFTPRunner.enqueue(Paths.get(testFile), attributes);
-        putSFTPRunner.run();
+        // FAIL Strategy
+        runner.setProperty(SFTPTransfer.CONFLICT_RESOLUTION, FileTransfer.CONFLICT_RESOLUTION_FAIL);
+        runner.enqueue(FLOW_FILE_CONTENTS, flowFileAttributes);
+        runner.run();
 
-        putSFTPRunner.assertTransferCount(PutSFTP.REL_SUCCESS, 0);
-        putSFTPRunner.assertTransferCount(PutSFTP.REL_REJECT, 0);
-        putSFTPRunner.assertTransferCount(PutSFTP.REL_FAILURE, 1);
+        runner.assertTransferCount(PutSFTP.REL_SUCCESS, 0);
+        runner.assertTransferCount(PutSFTP.REL_REJECT, 0);
+        runner.assertTransferCount(PutSFTP.REL_FAILURE, 1);
 
-        putSFTPRunner.clearTransferState();
+        runner.clearTransferState();
     }
 
     @Test
-    public void testPutSFTPBatching() throws IOException {
-        emptyTestDirectory();
+    void testRunBatching() {
+        runner.setProperty(SFTPTransfer.BATCH_SIZE, "2");
 
-        Map<String,String> attributes = new HashMap<>();
-        attributes.put("filename", "testfile.txt");
-        putSFTPRunner.enqueue(Paths.get(testFile), attributes);
+        runner.enqueue(FLOW_FILE_CONTENTS, Collections.singletonMap(CoreAttributes.FILENAME.key(), "1.txt"));
+        runner.enqueue(FLOW_FILE_CONTENTS, Collections.singletonMap(CoreAttributes.FILENAME.key(), "2.txt"));
+        runner.enqueue(FLOW_FILE_CONTENTS, Collections.singletonMap(CoreAttributes.FILENAME.key(), "3.txt"));
+        runner.enqueue(FLOW_FILE_CONTENTS, Collections.singletonMap(CoreAttributes.FILENAME.key(), "4.txt"));
+        runner.enqueue(FLOW_FILE_CONTENTS, Collections.singletonMap(CoreAttributes.FILENAME.key(), "5.txt"));
 
-        attributes = new HashMap<>();
-        attributes.put("filename", "testfile2.txt");
-        putSFTPRunner.enqueue(Paths.get(testFile), attributes);
+        runner.run();
+        runner.assertTransferCount(PutSFTP.REL_SUCCESS, 2);
 
-        attributes = new HashMap<>();
-        attributes.put("filename", "testfile3.txt");
-        putSFTPRunner.enqueue(Paths.get(testFile), attributes);
+        runner.clearTransferState();
 
-        attributes = new HashMap<>();
-        attributes.put("filename", "testfile4.txt");
-        putSFTPRunner.enqueue(Paths.get(testFile), attributes);
+        runner.run();
+        runner.assertTransferCount(PutSFTP.REL_SUCCESS, 2);
 
-        attributes = new HashMap<>();
-        attributes.put("filename", "testfile5.txt");
-        putSFTPRunner.enqueue(Paths.get(testFile), attributes);
-
-        putSFTPRunner.run();
-        putSFTPRunner.assertTransferCount(PutSFTP.REL_SUCCESS, 2);
-
-        putSFTPRunner.clearTransferState();
-
-        putSFTPRunner.run();
-        putSFTPRunner.assertTransferCount(PutSFTP.REL_SUCCESS, 2);
-
-        putSFTPRunner.clearTransferState();
+        runner.clearTransferState();
 
-        putSFTPRunner.run();
-        putSFTPRunner.assertTransferCount(PutSFTP.REL_SUCCESS, 1);
-        putSFTPRunner.clearTransferState();
+        runner.run();
+        runner.assertTransferCount(PutSFTP.REL_SUCCESS, 1);
+        runner.clearTransferState();
     }
 
     @Test
-    public void testPutSFTPProvenanceTransitUri() throws IOException {
-        emptyTestDirectory();
-
-        putSFTPRunner.setProperty(SFTPTransfer.REJECT_ZERO_BYTE, "false");
+    void testRunTransitUri() {
+        runner.setProperty(SFTPTransfer.REJECT_ZERO_BYTE, "false");
         Map<String,String> attributes = new HashMap<>();
         attributes.put("filename", "testfile.txt");
         attributes.put("transfer-host","localhost");
-        putSFTPRunner.enqueue(Paths.get(testFile), attributes);
+        runner.enqueue(FLOW_FILE_CONTENTS, attributes);
 
         attributes = new HashMap<>();
         attributes.put("filename", "testfile1.txt");
         attributes.put("transfer-host","127.0.0.1");
 
-        putSFTPRunner.enqueue(Paths.get(testFile), attributes);
-        putSFTPRunner.run();
+        runner.enqueue(FLOW_FILE_CONTENTS, attributes);
+        runner.run();
 
-        putSFTPRunner.assertTransferCount(PutSFTP.REL_SUCCESS, 2);
-        putSFTPRunner.getProvenanceEvents().forEach(k->{
-            assert(k.getTransitUri().contains("sftp://localhost"));
-        });
+        runner.assertTransferCount(PutSFTP.REL_SUCCESS, 2);
+        runner.getProvenanceEvents().forEach(k -> assertTrue(k.getTransitUri().contains("sftp://localhost")));
         //Two files in batch, should have 2 transferred to success, 0 to failure
-        putSFTPRunner.assertTransferCount(PutSFTP.REL_SUCCESS, 2);
-        putSFTPRunner.assertTransferCount(PutSFTP.REL_REJECT, 0);
+        runner.assertTransferCount(PutSFTP.REL_SUCCESS, 2);
+        runner.assertTransferCount(PutSFTP.REL_REJECT, 0);
 
-        MockFlowFile flowFile1 = putSFTPRunner.getFlowFilesForRelationship(PutFileTransfer.REL_SUCCESS).get(0);
-        MockFlowFile flowFile2 = putSFTPRunner.getFlowFilesForRelationship(PutFileTransfer.REL_SUCCESS).get(1);
-        putSFTPRunner.clearProvenanceEvents();
-        putSFTPRunner.clearTransferState();
+        MockFlowFile flowFile1 = runner.getFlowFilesForRelationship(PutFileTransfer.REL_SUCCESS).get(0);
+        MockFlowFile flowFile2 = runner.getFlowFilesForRelationship(PutFileTransfer.REL_SUCCESS).get(1);
+        runner.clearProvenanceEvents();
+        runner.clearTransferState();
 
         //Test different destinations on flow file attributes
-        putSFTPRunner.setProperty(SFTPTransfer.HOSTNAME,"${transfer-host}"); //set to derive hostname
-
-        putSFTPRunner.setThreadCount(1);
-        putSFTPRunner.enqueue(flowFile1);
-        putSFTPRunner.enqueue(flowFile2);
-        putSFTPRunner.run();
+        runner.setProperty(SFTPTransfer.HOSTNAME,"${transfer-host}"); //set to derive hostname
 
-        putSFTPRunner.assertTransferCount(PutSFTP.REL_SUCCESS, 2);
-        assert(putSFTPRunner.getProvenanceEvents().get(0).getTransitUri().contains("sftp://localhost"));
-        assert(putSFTPRunner.getProvenanceEvents().get(1).getTransitUri().contains("sftp://127.0.0.1"));
+        runner.setThreadCount(1);
+        runner.enqueue(flowFile1);
+        runner.enqueue(flowFile2);
+        runner.run();
 
-        putSFTPRunner.clearProvenanceEvents();
-        putSFTPRunner.clearTransferState();
-    }
+        runner.assertTransferCount(PutSFTP.REL_SUCCESS, 2);
+        assertTrue(runner.getProvenanceEvents().get(0).getTransitUri().contains("sftp://localhost"));
+        assertTrue(runner.getProvenanceEvents().get(1).getTransitUri().contains("sftp://127.0.0.1"));
 
-    private void emptyTestDirectory() throws IOException {
-        //Delete Virtual File System folder
-        Path dir = Paths.get(sshTestServer.getVirtualFileSystemPath());
-        FileUtils.cleanDirectory(dir.toFile());
+        runner.clearProvenanceEvents();
+        runner.clearTransferState();
     }
-
 }
\ No newline at end of file