You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by pv...@apache.org on 2022/12/02 09:08:27 UTC

[nifi] branch main updated: NIFI-10925 Refactored TestPutSFTP with discrete test methods

This is an automated email from the ASF dual-hosted git repository.

pvillard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new c51411d360 NIFI-10925 Refactored TestPutSFTP with discrete test methods
c51411d360 is described below

commit c51411d360a8354a003bce3bd80a5cdec0d89d08
Author: exceptionfactory <ex...@apache.org>
AuthorDate: Thu Dec 1 14:44:32 2022 -0600

    NIFI-10925 Refactored TestPutSFTP with discrete test methods
    
    Signed-off-by: Pierre Villard <pi...@gmail.com>
    
    This closes #6746.
---
 .../nifi/processors/standard/TestPutSFTP.java      | 213 +++++++++++----------
 1 file changed, 107 insertions(+), 106 deletions(-)

diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSFTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSFTP.java
index c5ff8f327d..e10f42b851 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSFTP.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSFTP.java
@@ -21,7 +21,7 @@ import org.apache.nifi.flowfile.attributes.CoreAttributes;
 import org.apache.nifi.processors.standard.util.FileTransfer;
 import org.apache.nifi.processors.standard.util.SFTPTransfer;
 import org.apache.nifi.processors.standard.util.SSHTestServer;
-import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
 import org.junit.jupiter.api.AfterEach;
@@ -33,14 +33,33 @@ import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.Collections;
-import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 
 import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.assertFalse;
 
 class TestPutSFTP {
     private static final String FLOW_FILE_CONTENTS = TestPutSFTP.class.getSimpleName();
 
+    private static final String LOCALHOST = "localhost";
+
+    private static final String LOCALHOST_ADDRESS = "127.0.0.1";
+
+    private static final String REMOTE_DIRECTORY = "nifi_test/";
+
+    private static final String FIRST_FILENAME = "1.txt";
+
+    private static final String TRANSFER_HOST_ATTRIBUTE = "transfer-host";
+
+    private static final int BATCH_SIZE = 2;
+
+    private static final byte[] ZERO_BYTES = new byte[]{};
+
+    private static final String TRANSIT_URI_FORMAT = "sftp://%s";
+
     private SSHTestServer sshTestServer;
 
     private TestRunner runner;
@@ -51,16 +70,16 @@ class TestPutSFTP {
         sshTestServer.startServer();
 
         runner = TestRunners.newTestRunner(PutSFTP.class);
-        runner.setProperty(SFTPTransfer.HOSTNAME, "localhost");
+        runner.setProperty(SFTPTransfer.HOSTNAME, LOCALHOST);
         runner.setProperty(SFTPTransfer.PORT, Integer.toString(sshTestServer.getSSHPort()));
         runner.setProperty(SFTPTransfer.USERNAME, sshTestServer.getUsername());
         runner.setProperty(SFTPTransfer.PASSWORD, sshTestServer.getPassword());
-        runner.setProperty(SFTPTransfer.STRICT_HOST_KEY_CHECKING, "false");
-        runner.setProperty(SFTPTransfer.BATCH_SIZE, "2");
-        runner.setProperty(SFTPTransfer.REMOTE_PATH, "nifi_test/");
-        runner.setProperty(SFTPTransfer.REJECT_ZERO_BYTE, "true");
+        runner.setProperty(SFTPTransfer.STRICT_HOST_KEY_CHECKING, Boolean.FALSE.toString());
+        runner.setProperty(SFTPTransfer.BATCH_SIZE, Integer.toString(BATCH_SIZE));
+        runner.setProperty(SFTPTransfer.REMOTE_PATH, REMOTE_DIRECTORY);
+        runner.setProperty(SFTPTransfer.REJECT_ZERO_BYTE, Boolean.TRUE.toString());
         runner.setProperty(SFTPTransfer.CONFLICT_RESOLUTION, FileTransfer.CONFLICT_RESOLUTION_REPLACE);
-        runner.setProperty(SFTPTransfer.CREATE_DIRECTORY, "true");
+        runner.setProperty(SFTPTransfer.CREATE_DIRECTORY, Boolean.TRUE.toString());
         runner.setProperty(SFTPTransfer.DATA_TIMEOUT, "30 sec");
         runner.setValidateExpressionUsage(false);
     }
@@ -74,176 +93,158 @@ class TestPutSFTP {
 
     @Test
     void testRunNewDirectory() {
-        runner.enqueue(FLOW_FILE_CONTENTS, Collections.singletonMap(CoreAttributes.FILENAME.key(), "1.txt"));
+        runner.enqueue(FLOW_FILE_CONTENTS, Collections.singletonMap(CoreAttributes.FILENAME.key(), FIRST_FILENAME));
         runner.run();
 
         runner.assertTransferCount(PutSFTP.REL_SUCCESS, 1);
 
-        //verify directory exists
-        Path newDirectory = Paths.get(sshTestServer.getVirtualFileSystemPath() + "nifi_test/");
-        Path newFile = Paths.get(sshTestServer.getVirtualFileSystemPath() + "nifi_test/1.txt");
+        Path newDirectory = Paths.get(sshTestServer.getVirtualFileSystemPath() + REMOTE_DIRECTORY);
+        Path newFile = Paths.get(sshTestServer.getVirtualFileSystemPath() + REMOTE_DIRECTORY + FIRST_FILENAME);
         assertTrue(newDirectory.toAbsolutePath().toFile().exists(), "New Directory not created");
         assertTrue(newFile.toAbsolutePath().toFile().exists(), "New File not created");
         runner.clearTransferState();
     }
 
     @Test
-    void testRunZeroByteFile() {
-        runner.enqueue(FLOW_FILE_CONTENTS, Collections.singletonMap(CoreAttributes.FILENAME.key(), "1.txt"));
-        runner.enqueue("", Collections.singletonMap(CoreAttributes.FILENAME.key(), "2.txt"));
-
+    void testRunZeroByteFileRejected() {
+        runner.enqueue(ZERO_BYTES, Collections.singletonMap(CoreAttributes.FILENAME.key(), FIRST_FILENAME));
         runner.run();
 
-        //Two files in batch, should have only 1 transferred to sucess, 1 to failure
-        runner.assertTransferCount(PutSFTP.REL_SUCCESS, 1);
-        runner.assertTransferCount(PutSFTP.REL_REJECT, 1);
-        runner.clearTransferState();
-
-        runner.enqueue("", Collections.singletonMap(CoreAttributes.FILENAME.key(), "1.txt"));
-
-        runner.run();
-
-        //One files in batch, should have 0 transferred to output since it's zero byte
         runner.assertTransferCount(PutSFTP.REL_REJECT, 1);
-        runner.assertTransferCount(PutSFTP.REL_SUCCESS, 0);
-        runner.clearTransferState();
-
-        //allow zero byte files
-        runner.setProperty(SFTPTransfer.REJECT_ZERO_BYTE, "false");
-
-        runner.enqueue("", Collections.singletonMap(CoreAttributes.FILENAME.key(), "1.txt"));
+    }
 
+    @Test
+    void testRunZeroByteFileAllowed() {
+        runner.setProperty(SFTPTransfer.REJECT_ZERO_BYTE, Boolean.FALSE.toString());
+        runner.enqueue(ZERO_BYTES, Collections.singletonMap(CoreAttributes.FILENAME.key(), FIRST_FILENAME));
         runner.run();
 
-        //should have 1 transferred to sucess
         runner.assertTransferCount(PutSFTP.REL_SUCCESS, 1);
-
-        //revert settings
-        runner.setProperty(SFTPTransfer.REJECT_ZERO_BYTE, "true");
-        runner.clearTransferState();
     }
 
     @Test
-    void testRunConflictResolution() throws IOException {
-        final String directoryName = "nifi_test";
-        final String filename = "1";
+    void testRunConflictResolutionReplaceStrategy() throws IOException {
+        createRemoteFile();
 
-        final Path directory = Paths.get(sshTestServer.getVirtualFileSystemPath() + directoryName );
-        final Path subDirectory = Paths.get(directory.toString(), filename);
-        Files.createDirectory(directory);
-        Files.createFile(subDirectory);
-
-        final Map<String, String> flowFileAttributes = Collections.singletonMap(CoreAttributes.FILENAME.key(), filename);
-
-        // REPLACE Strategy
         runner.setProperty(SFTPTransfer.CONFLICT_RESOLUTION, FileTransfer.CONFLICT_RESOLUTION_REPLACE);
-
-        runner.enqueue(FLOW_FILE_CONTENTS, flowFileAttributes);
+        runner.enqueue(FLOW_FILE_CONTENTS, Collections.singletonMap(CoreAttributes.FILENAME.key(), FIRST_FILENAME));
         runner.run();
 
         runner.assertTransferCount(PutSFTP.REL_SUCCESS, 1);
         runner.assertTransferCount(PutSFTP.REL_REJECT, 0);
         runner.assertTransferCount(PutSFTP.REL_FAILURE, 0);
-        runner.clearTransferState();
+    }
 
-        // REJECT Strategy
-        runner.setProperty(SFTPTransfer.CONFLICT_RESOLUTION, FileTransfer.CONFLICT_RESOLUTION_REJECT);
+    @Test
+    void testRunConflictResolutionRejectStrategy() throws IOException {
+        createRemoteFile();
 
-        runner.enqueue(FLOW_FILE_CONTENTS, flowFileAttributes);
+        runner.setProperty(SFTPTransfer.CONFLICT_RESOLUTION, FileTransfer.CONFLICT_RESOLUTION_REJECT);
+        runner.enqueue(FLOW_FILE_CONTENTS, Collections.singletonMap(CoreAttributes.FILENAME.key(), FIRST_FILENAME));
         runner.run();
 
         runner.assertTransferCount(PutSFTP.REL_SUCCESS, 0);
         runner.assertTransferCount(PutSFTP.REL_REJECT, 1);
         runner.assertTransferCount(PutSFTP.REL_FAILURE, 0);
-        runner.clearTransferState();
+    }
+
+    @Test
+    void testRunConflictResolutionIgnoreStrategy() throws IOException {
+        createRemoteFile();
 
-        // IGNORE Strategy
         runner.setProperty(SFTPTransfer.CONFLICT_RESOLUTION, FileTransfer.CONFLICT_RESOLUTION_IGNORE);
-        runner.enqueue(FLOW_FILE_CONTENTS, flowFileAttributes);
+        runner.enqueue(FLOW_FILE_CONTENTS, Collections.singletonMap(CoreAttributes.FILENAME.key(), FIRST_FILENAME));
         runner.run();
 
         runner.assertTransferCount(PutSFTP.REL_SUCCESS, 1);
         runner.assertTransferCount(PutSFTP.REL_REJECT, 0);
         runner.assertTransferCount(PutSFTP.REL_FAILURE, 0);
+    }
 
-        runner.clearTransferState();
+    @Test
+    void testRunConflictResolutionFailStrategy() throws IOException {
+        createRemoteFile();
 
-        // FAIL Strategy
         runner.setProperty(SFTPTransfer.CONFLICT_RESOLUTION, FileTransfer.CONFLICT_RESOLUTION_FAIL);
-        runner.enqueue(FLOW_FILE_CONTENTS, flowFileAttributes);
+        runner.enqueue(FLOW_FILE_CONTENTS, Collections.singletonMap(CoreAttributes.FILENAME.key(), FIRST_FILENAME));
         runner.run();
 
         runner.assertTransferCount(PutSFTP.REL_SUCCESS, 0);
         runner.assertTransferCount(PutSFTP.REL_REJECT, 0);
         runner.assertTransferCount(PutSFTP.REL_FAILURE, 1);
-
-        runner.clearTransferState();
     }
 
     @Test
     void testRunBatching() {
-        runner.setProperty(SFTPTransfer.BATCH_SIZE, "2");
-
-        runner.enqueue(FLOW_FILE_CONTENTS, Collections.singletonMap(CoreAttributes.FILENAME.key(), "1.txt"));
-        runner.enqueue(FLOW_FILE_CONTENTS, Collections.singletonMap(CoreAttributes.FILENAME.key(), "2.txt"));
-        runner.enqueue(FLOW_FILE_CONTENTS, Collections.singletonMap(CoreAttributes.FILENAME.key(), "3.txt"));
-        runner.enqueue(FLOW_FILE_CONTENTS, Collections.singletonMap(CoreAttributes.FILENAME.key(), "4.txt"));
-        runner.enqueue(FLOW_FILE_CONTENTS, Collections.singletonMap(CoreAttributes.FILENAME.key(), "5.txt"));
+        final int files = 4;
+        for (int fileNumber = 1; fileNumber <= files; fileNumber++) {
+            final String filename = Integer.toString(fileNumber);
+            runner.enqueue(FLOW_FILE_CONTENTS, Collections.singletonMap(CoreAttributes.FILENAME.key(), filename));
+        }
 
         runner.run();
-        runner.assertTransferCount(PutSFTP.REL_SUCCESS, 2);
-
+        runner.assertTransferCount(PutSFTP.REL_SUCCESS, BATCH_SIZE);
         runner.clearTransferState();
 
         runner.run();
-        runner.assertTransferCount(PutSFTP.REL_SUCCESS, 2);
+        runner.assertTransferCount(PutSFTP.REL_SUCCESS, BATCH_SIZE);
+        runner.assertQueueEmpty();
+    }
 
-        runner.clearTransferState();
+    @Test
+    void testRunTransitUri() {
+        runner.enqueue(FLOW_FILE_CONTENTS, Collections.singletonMap(CoreAttributes.FILENAME.key(), FIRST_FILENAME));
 
         runner.run();
+
         runner.assertTransferCount(PutSFTP.REL_SUCCESS, 1);
-        runner.clearTransferState();
+
+        final List<ProvenanceEventRecord> records = runner.getProvenanceEvents();
+        assertFalse(records.isEmpty());
+
+        final ProvenanceEventRecord record = records.iterator().next();
+        final String firstTransitUri = String.format(TRANSIT_URI_FORMAT, LOCALHOST);
+        assertTrue(record.getTransitUri().startsWith(firstTransitUri), "Transit URI not found");
     }
 
     @Test
-    void testRunTransitUri() {
-        runner.setProperty(SFTPTransfer.REJECT_ZERO_BYTE, "false");
-        Map<String,String> attributes = new HashMap<>();
-        attributes.put("filename", "testfile.txt");
-        attributes.put("transfer-host","localhost");
-        runner.enqueue(FLOW_FILE_CONTENTS, attributes);
+    void testRunTransitUriDifferentHosts() {
+        runner.setProperty(SFTPTransfer.REJECT_ZERO_BYTE, Boolean.FALSE.toString());
+        runner.setProperty(SFTPTransfer.HOSTNAME, "${transfer-host}");
+
+        final Map<String, String> firstAttributes = new LinkedHashMap<>();
+        firstAttributes.put(CoreAttributes.FILENAME.key(), FIRST_FILENAME);
+        firstAttributes.put(TRANSFER_HOST_ATTRIBUTE, LOCALHOST);
+        runner.enqueue(FLOW_FILE_CONTENTS, firstAttributes);
 
-        attributes = new HashMap<>();
-        attributes.put("filename", "testfile1.txt");
-        attributes.put("transfer-host","127.0.0.1");
+        final Map<String, String> secondAttributes = new LinkedHashMap<>();
+        secondAttributes.put(CoreAttributes.FILENAME.key(), FIRST_FILENAME);
+        secondAttributes.put(TRANSFER_HOST_ATTRIBUTE, LOCALHOST_ADDRESS);
+        runner.enqueue(FLOW_FILE_CONTENTS, secondAttributes);
 
-        runner.enqueue(FLOW_FILE_CONTENTS, attributes);
         runner.run();
 
         runner.assertTransferCount(PutSFTP.REL_SUCCESS, 2);
-        runner.getProvenanceEvents().forEach(k -> assertTrue(k.getTransitUri().contains("sftp://localhost")));
-        //Two files in batch, should have 2 transferred to success, 0 to failure
-        runner.assertTransferCount(PutSFTP.REL_SUCCESS, 2);
-        runner.assertTransferCount(PutSFTP.REL_REJECT, 0);
 
-        MockFlowFile flowFile1 = runner.getFlowFilesForRelationship(PutFileTransfer.REL_SUCCESS).get(0);
-        MockFlowFile flowFile2 = runner.getFlowFilesForRelationship(PutFileTransfer.REL_SUCCESS).get(1);
-        runner.clearProvenanceEvents();
-        runner.clearTransferState();
-
-        //Test different destinations on flow file attributes
-        runner.setProperty(SFTPTransfer.HOSTNAME,"${transfer-host}"); //set to derive hostname
+        final List<ProvenanceEventRecord> records = runner.getProvenanceEvents();
 
-        runner.setThreadCount(1);
-        runner.enqueue(flowFile1);
-        runner.enqueue(flowFile2);
-        runner.run();
+        final String firstTransitUri = String.format(TRANSIT_URI_FORMAT, LOCALHOST);
+        final Optional<ProvenanceEventRecord> firstRecord = records.stream()
+                .filter(record -> record.getTransitUri().startsWith(firstTransitUri))
+                .findFirst();
+        assertTrue(firstRecord.isPresent(), "First Transit URI not found");
 
-        runner.assertTransferCount(PutSFTP.REL_SUCCESS, 2);
-        assertTrue(runner.getProvenanceEvents().get(0).getTransitUri().contains("sftp://localhost"));
-        assertTrue(runner.getProvenanceEvents().get(1).getTransitUri().contains("sftp://127.0.0.1"));
+        final String secondTransitUri = String.format(TRANSIT_URI_FORMAT, LOCALHOST_ADDRESS);
+        final Optional<ProvenanceEventRecord> secondRecord = records.stream()
+                .filter(record -> record.getTransitUri().startsWith(secondTransitUri))
+                .findFirst();
+        assertTrue(secondRecord.isPresent(), "Second Transit URI not found");
+    }
 
-        runner.clearProvenanceEvents();
-        runner.clearTransferState();
+    private void createRemoteFile() throws IOException {
+        final Path directory = Paths.get(sshTestServer.getVirtualFileSystemPath() + REMOTE_DIRECTORY);
+        final Path subDirectory = Paths.get(directory.toString(), FIRST_FILENAME);
+        Files.createDirectory(directory);
+        Files.createFile(subDirectory);
     }
-}
\ No newline at end of file
+}