You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by pv...@apache.org on 2022/12/02 09:08:27 UTC
[nifi] branch main updated: NIFI-10925 Refactored TestPutSFTP with discrete test methods
This is an automated email from the ASF dual-hosted git repository.
pvillard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new c51411d360 NIFI-10925 Refactored TestPutSFTP with discrete test methods
c51411d360 is described below
commit c51411d360a8354a003bce3bd80a5cdec0d89d08
Author: exceptionfactory <ex...@apache.org>
AuthorDate: Thu Dec 1 14:44:32 2022 -0600
NIFI-10925 Refactored TestPutSFTP with discrete test methods
Signed-off-by: Pierre Villard <pi...@gmail.com>
This closes #6746.
---
.../nifi/processors/standard/TestPutSFTP.java | 213 +++++++++++----------
1 file changed, 107 insertions(+), 106 deletions(-)
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSFTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSFTP.java
index c5ff8f327d..e10f42b851 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSFTP.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSFTP.java
@@ -21,7 +21,7 @@ import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processors.standard.util.FileTransfer;
import org.apache.nifi.processors.standard.util.SFTPTransfer;
import org.apache.nifi.processors.standard.util.SSHTestServer;
-import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.jupiter.api.AfterEach;
@@ -33,14 +33,33 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Collections;
-import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
import java.util.Map;
+import java.util.Optional;
import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.assertFalse;
class TestPutSFTP {
private static final String FLOW_FILE_CONTENTS = TestPutSFTP.class.getSimpleName();
+ private static final String LOCALHOST = "localhost";
+
+ private static final String LOCALHOST_ADDRESS = "127.0.0.1";
+
+ private static final String REMOTE_DIRECTORY = "nifi_test/";
+
+ private static final String FIRST_FILENAME = "1.txt";
+
+ private static final String TRANSFER_HOST_ATTRIBUTE = "transfer-host";
+
+ private static final int BATCH_SIZE = 2;
+
+ private static final byte[] ZERO_BYTES = new byte[]{};
+
+ private static final String TRANSIT_URI_FORMAT = "sftp://%s";
+
private SSHTestServer sshTestServer;
private TestRunner runner;
@@ -51,16 +70,16 @@ class TestPutSFTP {
sshTestServer.startServer();
runner = TestRunners.newTestRunner(PutSFTP.class);
- runner.setProperty(SFTPTransfer.HOSTNAME, "localhost");
+ runner.setProperty(SFTPTransfer.HOSTNAME, LOCALHOST);
runner.setProperty(SFTPTransfer.PORT, Integer.toString(sshTestServer.getSSHPort()));
runner.setProperty(SFTPTransfer.USERNAME, sshTestServer.getUsername());
runner.setProperty(SFTPTransfer.PASSWORD, sshTestServer.getPassword());
- runner.setProperty(SFTPTransfer.STRICT_HOST_KEY_CHECKING, "false");
- runner.setProperty(SFTPTransfer.BATCH_SIZE, "2");
- runner.setProperty(SFTPTransfer.REMOTE_PATH, "nifi_test/");
- runner.setProperty(SFTPTransfer.REJECT_ZERO_BYTE, "true");
+ runner.setProperty(SFTPTransfer.STRICT_HOST_KEY_CHECKING, Boolean.FALSE.toString());
+ runner.setProperty(SFTPTransfer.BATCH_SIZE, Integer.toString(BATCH_SIZE));
+ runner.setProperty(SFTPTransfer.REMOTE_PATH, REMOTE_DIRECTORY);
+ runner.setProperty(SFTPTransfer.REJECT_ZERO_BYTE, Boolean.TRUE.toString());
runner.setProperty(SFTPTransfer.CONFLICT_RESOLUTION, FileTransfer.CONFLICT_RESOLUTION_REPLACE);
- runner.setProperty(SFTPTransfer.CREATE_DIRECTORY, "true");
+ runner.setProperty(SFTPTransfer.CREATE_DIRECTORY, Boolean.TRUE.toString());
runner.setProperty(SFTPTransfer.DATA_TIMEOUT, "30 sec");
runner.setValidateExpressionUsage(false);
}
@@ -74,176 +93,158 @@ class TestPutSFTP {
@Test
void testRunNewDirectory() {
- runner.enqueue(FLOW_FILE_CONTENTS, Collections.singletonMap(CoreAttributes.FILENAME.key(), "1.txt"));
+ runner.enqueue(FLOW_FILE_CONTENTS, Collections.singletonMap(CoreAttributes.FILENAME.key(), FIRST_FILENAME));
runner.run();
runner.assertTransferCount(PutSFTP.REL_SUCCESS, 1);
- //verify directory exists
- Path newDirectory = Paths.get(sshTestServer.getVirtualFileSystemPath() + "nifi_test/");
- Path newFile = Paths.get(sshTestServer.getVirtualFileSystemPath() + "nifi_test/1.txt");
+ Path newDirectory = Paths.get(sshTestServer.getVirtualFileSystemPath() + REMOTE_DIRECTORY);
+ Path newFile = Paths.get(sshTestServer.getVirtualFileSystemPath() + REMOTE_DIRECTORY + FIRST_FILENAME);
assertTrue(newDirectory.toAbsolutePath().toFile().exists(), "New Directory not created");
assertTrue(newFile.toAbsolutePath().toFile().exists(), "New File not created");
runner.clearTransferState();
}
@Test
- void testRunZeroByteFile() {
- runner.enqueue(FLOW_FILE_CONTENTS, Collections.singletonMap(CoreAttributes.FILENAME.key(), "1.txt"));
- runner.enqueue("", Collections.singletonMap(CoreAttributes.FILENAME.key(), "2.txt"));
-
+ void testRunZeroByteFileRejected() {
+ runner.enqueue(ZERO_BYTES, Collections.singletonMap(CoreAttributes.FILENAME.key(), FIRST_FILENAME));
runner.run();
- //Two files in batch, should have only 1 transferred to sucess, 1 to failure
- runner.assertTransferCount(PutSFTP.REL_SUCCESS, 1);
- runner.assertTransferCount(PutSFTP.REL_REJECT, 1);
- runner.clearTransferState();
-
- runner.enqueue("", Collections.singletonMap(CoreAttributes.FILENAME.key(), "1.txt"));
-
- runner.run();
-
- //One files in batch, should have 0 transferred to output since it's zero byte
runner.assertTransferCount(PutSFTP.REL_REJECT, 1);
- runner.assertTransferCount(PutSFTP.REL_SUCCESS, 0);
- runner.clearTransferState();
-
- //allow zero byte files
- runner.setProperty(SFTPTransfer.REJECT_ZERO_BYTE, "false");
-
- runner.enqueue("", Collections.singletonMap(CoreAttributes.FILENAME.key(), "1.txt"));
+ }
+ @Test
+ void testRunZeroByteFileAllowed() {
+ runner.setProperty(SFTPTransfer.REJECT_ZERO_BYTE, Boolean.FALSE.toString());
+ runner.enqueue(ZERO_BYTES, Collections.singletonMap(CoreAttributes.FILENAME.key(), FIRST_FILENAME));
runner.run();
- //should have 1 transferred to sucess
runner.assertTransferCount(PutSFTP.REL_SUCCESS, 1);
-
- //revert settings
- runner.setProperty(SFTPTransfer.REJECT_ZERO_BYTE, "true");
- runner.clearTransferState();
}
@Test
- void testRunConflictResolution() throws IOException {
- final String directoryName = "nifi_test";
- final String filename = "1";
+ void testRunConflictResolutionReplaceStrategy() throws IOException {
+ createRemoteFile();
- final Path directory = Paths.get(sshTestServer.getVirtualFileSystemPath() + directoryName );
- final Path subDirectory = Paths.get(directory.toString(), filename);
- Files.createDirectory(directory);
- Files.createFile(subDirectory);
-
- final Map<String, String> flowFileAttributes = Collections.singletonMap(CoreAttributes.FILENAME.key(), filename);
-
- // REPLACE Strategy
runner.setProperty(SFTPTransfer.CONFLICT_RESOLUTION, FileTransfer.CONFLICT_RESOLUTION_REPLACE);
-
- runner.enqueue(FLOW_FILE_CONTENTS, flowFileAttributes);
+ runner.enqueue(FLOW_FILE_CONTENTS, Collections.singletonMap(CoreAttributes.FILENAME.key(), FIRST_FILENAME));
runner.run();
runner.assertTransferCount(PutSFTP.REL_SUCCESS, 1);
runner.assertTransferCount(PutSFTP.REL_REJECT, 0);
runner.assertTransferCount(PutSFTP.REL_FAILURE, 0);
- runner.clearTransferState();
+ }
- // REJECT Strategy
- runner.setProperty(SFTPTransfer.CONFLICT_RESOLUTION, FileTransfer.CONFLICT_RESOLUTION_REJECT);
+ @Test
+ void testRunConflictResolutionRejectStrategy() throws IOException {
+ createRemoteFile();
- runner.enqueue(FLOW_FILE_CONTENTS, flowFileAttributes);
+ runner.setProperty(SFTPTransfer.CONFLICT_RESOLUTION, FileTransfer.CONFLICT_RESOLUTION_REJECT);
+ runner.enqueue(FLOW_FILE_CONTENTS, Collections.singletonMap(CoreAttributes.FILENAME.key(), FIRST_FILENAME));
runner.run();
runner.assertTransferCount(PutSFTP.REL_SUCCESS, 0);
runner.assertTransferCount(PutSFTP.REL_REJECT, 1);
runner.assertTransferCount(PutSFTP.REL_FAILURE, 0);
- runner.clearTransferState();
+ }
+
+ @Test
+ void testRunConflictResolutionIgnoreStrategy() throws IOException {
+ createRemoteFile();
- // IGNORE Strategy
runner.setProperty(SFTPTransfer.CONFLICT_RESOLUTION, FileTransfer.CONFLICT_RESOLUTION_IGNORE);
- runner.enqueue(FLOW_FILE_CONTENTS, flowFileAttributes);
+ runner.enqueue(FLOW_FILE_CONTENTS, Collections.singletonMap(CoreAttributes.FILENAME.key(), FIRST_FILENAME));
runner.run();
runner.assertTransferCount(PutSFTP.REL_SUCCESS, 1);
runner.assertTransferCount(PutSFTP.REL_REJECT, 0);
runner.assertTransferCount(PutSFTP.REL_FAILURE, 0);
+ }
- runner.clearTransferState();
+ @Test
+ void testRunConflictResolutionFailStrategy() throws IOException {
+ createRemoteFile();
- // FAIL Strategy
runner.setProperty(SFTPTransfer.CONFLICT_RESOLUTION, FileTransfer.CONFLICT_RESOLUTION_FAIL);
- runner.enqueue(FLOW_FILE_CONTENTS, flowFileAttributes);
+ runner.enqueue(FLOW_FILE_CONTENTS, Collections.singletonMap(CoreAttributes.FILENAME.key(), FIRST_FILENAME));
runner.run();
runner.assertTransferCount(PutSFTP.REL_SUCCESS, 0);
runner.assertTransferCount(PutSFTP.REL_REJECT, 0);
runner.assertTransferCount(PutSFTP.REL_FAILURE, 1);
-
- runner.clearTransferState();
}
@Test
void testRunBatching() {
- runner.setProperty(SFTPTransfer.BATCH_SIZE, "2");
-
- runner.enqueue(FLOW_FILE_CONTENTS, Collections.singletonMap(CoreAttributes.FILENAME.key(), "1.txt"));
- runner.enqueue(FLOW_FILE_CONTENTS, Collections.singletonMap(CoreAttributes.FILENAME.key(), "2.txt"));
- runner.enqueue(FLOW_FILE_CONTENTS, Collections.singletonMap(CoreAttributes.FILENAME.key(), "3.txt"));
- runner.enqueue(FLOW_FILE_CONTENTS, Collections.singletonMap(CoreAttributes.FILENAME.key(), "4.txt"));
- runner.enqueue(FLOW_FILE_CONTENTS, Collections.singletonMap(CoreAttributes.FILENAME.key(), "5.txt"));
+ final int files = 4;
+ for (int fileNumber = 1; fileNumber <= files; fileNumber++) {
+ final String filename = Integer.toString(fileNumber);
+ runner.enqueue(FLOW_FILE_CONTENTS, Collections.singletonMap(CoreAttributes.FILENAME.key(), filename));
+ }
runner.run();
- runner.assertTransferCount(PutSFTP.REL_SUCCESS, 2);
-
+ runner.assertTransferCount(PutSFTP.REL_SUCCESS, BATCH_SIZE);
runner.clearTransferState();
runner.run();
- runner.assertTransferCount(PutSFTP.REL_SUCCESS, 2);
+ runner.assertTransferCount(PutSFTP.REL_SUCCESS, BATCH_SIZE);
+ runner.assertQueueEmpty();
+ }
- runner.clearTransferState();
+ @Test
+ void testRunTransitUri() {
+ runner.enqueue(FLOW_FILE_CONTENTS, Collections.singletonMap(CoreAttributes.FILENAME.key(), FIRST_FILENAME));
runner.run();
+
runner.assertTransferCount(PutSFTP.REL_SUCCESS, 1);
- runner.clearTransferState();
+
+ final List<ProvenanceEventRecord> records = runner.getProvenanceEvents();
+ assertFalse(records.isEmpty());
+
+ final ProvenanceEventRecord record = records.iterator().next();
+ final String firstTransitUri = String.format(TRANSIT_URI_FORMAT, LOCALHOST);
+ assertTrue(record.getTransitUri().startsWith(firstTransitUri), "Transit URI not found");
}
@Test
- void testRunTransitUri() {
- runner.setProperty(SFTPTransfer.REJECT_ZERO_BYTE, "false");
- Map<String,String> attributes = new HashMap<>();
- attributes.put("filename", "testfile.txt");
- attributes.put("transfer-host","localhost");
- runner.enqueue(FLOW_FILE_CONTENTS, attributes);
+ void testRunTransitUriDifferentHosts() {
+ runner.setProperty(SFTPTransfer.REJECT_ZERO_BYTE, Boolean.FALSE.toString());
+ runner.setProperty(SFTPTransfer.HOSTNAME, "${transfer-host}");
+
+ final Map<String, String> firstAttributes = new LinkedHashMap<>();
+ firstAttributes.put(CoreAttributes.FILENAME.key(), FIRST_FILENAME);
+ firstAttributes.put(TRANSFER_HOST_ATTRIBUTE, LOCALHOST);
+ runner.enqueue(FLOW_FILE_CONTENTS, firstAttributes);
- attributes = new HashMap<>();
- attributes.put("filename", "testfile1.txt");
- attributes.put("transfer-host","127.0.0.1");
+ final Map<String, String> secondAttributes = new LinkedHashMap<>();
+ secondAttributes.put(CoreAttributes.FILENAME.key(), FIRST_FILENAME);
+ secondAttributes.put(TRANSFER_HOST_ATTRIBUTE, LOCALHOST_ADDRESS);
+ runner.enqueue(FLOW_FILE_CONTENTS, secondAttributes);
- runner.enqueue(FLOW_FILE_CONTENTS, attributes);
runner.run();
runner.assertTransferCount(PutSFTP.REL_SUCCESS, 2);
- runner.getProvenanceEvents().forEach(k -> assertTrue(k.getTransitUri().contains("sftp://localhost")));
- //Two files in batch, should have 2 transferred to success, 0 to failure
- runner.assertTransferCount(PutSFTP.REL_SUCCESS, 2);
- runner.assertTransferCount(PutSFTP.REL_REJECT, 0);
- MockFlowFile flowFile1 = runner.getFlowFilesForRelationship(PutFileTransfer.REL_SUCCESS).get(0);
- MockFlowFile flowFile2 = runner.getFlowFilesForRelationship(PutFileTransfer.REL_SUCCESS).get(1);
- runner.clearProvenanceEvents();
- runner.clearTransferState();
-
- //Test different destinations on flow file attributes
- runner.setProperty(SFTPTransfer.HOSTNAME,"${transfer-host}"); //set to derive hostname
+ final List<ProvenanceEventRecord> records = runner.getProvenanceEvents();
- runner.setThreadCount(1);
- runner.enqueue(flowFile1);
- runner.enqueue(flowFile2);
- runner.run();
+ final String firstTransitUri = String.format(TRANSIT_URI_FORMAT, LOCALHOST);
+ final Optional<ProvenanceEventRecord> firstRecord = records.stream()
+ .filter(record -> record.getTransitUri().startsWith(firstTransitUri))
+ .findFirst();
+ assertTrue(firstRecord.isPresent(), "First Transit URI not found");
- runner.assertTransferCount(PutSFTP.REL_SUCCESS, 2);
- assertTrue(runner.getProvenanceEvents().get(0).getTransitUri().contains("sftp://localhost"));
- assertTrue(runner.getProvenanceEvents().get(1).getTransitUri().contains("sftp://127.0.0.1"));
+ final String secondTransitUri = String.format(TRANSIT_URI_FORMAT, LOCALHOST_ADDRESS);
+ final Optional<ProvenanceEventRecord> secondRecord = records.stream()
+ .filter(record -> record.getTransitUri().startsWith(secondTransitUri))
+ .findFirst();
+ assertTrue(secondRecord.isPresent(), "Second Transit URI not found");
+ }
- runner.clearProvenanceEvents();
- runner.clearTransferState();
+ private void createRemoteFile() throws IOException {
+ final Path directory = Paths.get(sshTestServer.getVirtualFileSystemPath() + REMOTE_DIRECTORY);
+ final Path subDirectory = Paths.get(directory.toString(), FIRST_FILENAME);
+ Files.createDirectory(directory);
+ Files.createFile(subDirectory);
}
-}
\ No newline at end of file
+}