You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by tu...@apache.org on 2020/05/19 16:37:29 UTC
[nifi] branch master updated: NIFI-7336: Add tests for
DeleteAzureDataLakeStorage
This is an automated email from the ASF dual-hosted git repository.
turcsanyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/master by this push:
new d6240a1 NIFI-7336: Add tests for DeleteAzureDataLakeStorage
d6240a1 is described below
commit d6240a1074aa33d650cb91dec3e69b7d5e3da9b5
Author: Peter Gyori <pe...@gmail.com>
AuthorDate: Thu May 7 16:44:25 2020 +0200
NIFI-7336: Add tests for DeleteAzureDataLakeStorage
DeleteAzureDataLakeStorage now throws exception if fileSystem or fileName is empty string
NIFI-7336: Add tests for DeleteAzureDataLakeStorage - typos fixed
NIFI-7336: Add tests for DeleteAzureDataLakeStorage - fixed a test case
This closes #4272.
Signed-off-by: Peter Turcsanyi <tu...@apache.org>
---
.../azure/storage/DeleteAzureDataLakeStorage.java | 29 +-
.../azure/storage/FetchAzureDataLakeStorage.java | 4 +-
.../storage/ITDeleteAzureDataLakeStorage.java | 382 ++++++++++++++++++++-
3 files changed, 384 insertions(+), 31 deletions(-)
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/DeleteAzureDataLakeStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/DeleteAzureDataLakeStorage.java
index 86b23aa..314785a 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/DeleteAzureDataLakeStorage.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/DeleteAzureDataLakeStorage.java
@@ -16,6 +16,11 @@
*/
package org.apache.nifi.processors.azure.storage;
+import com.azure.storage.file.datalake.DataLakeDirectoryClient;
+import com.azure.storage.file.datalake.DataLakeFileClient;
+import com.azure.storage.file.datalake.DataLakeFileSystemClient;
+import com.azure.storage.file.datalake.DataLakeServiceClient;
+import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
@@ -27,11 +32,6 @@ import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor;
-import com.azure.storage.file.datalake.DataLakeDirectoryClient;
-import com.azure.storage.file.datalake.DataLakeFileClient;
-import com.azure.storage.file.datalake.DataLakeFileSystemClient;
-import com.azure.storage.file.datalake.DataLakeServiceClient;
-
import java.util.concurrent.TimeUnit;
@Tags({"azure", "microsoft", "cloud", "storage", "adlsgen2", "datalake"})
@@ -43,7 +43,6 @@ public class DeleteAzureDataLakeStorage extends AbstractAzureDataLakeStorageProc
@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
FlowFile flowFile = session.get();
-
if (flowFile == null) {
return;
}
@@ -53,11 +52,21 @@ public class DeleteAzureDataLakeStorage extends AbstractAzureDataLakeStorageProc
final String fileSystem = context.getProperty(FILESYSTEM).evaluateAttributeExpressions(flowFile).getValue();
final String directory = context.getProperty(DIRECTORY).evaluateAttributeExpressions(flowFile).getValue();
final String fileName = context.getProperty(FILE).evaluateAttributeExpressions(flowFile).getValue();
- final DataLakeServiceClient storageClient = getStorageClient(context, flowFile);
- final DataLakeFileSystemClient dataLakeFileSystemClient = storageClient.getFileSystemClient(fileSystem);
- final DataLakeDirectoryClient directoryClient = dataLakeFileSystemClient.getDirectoryClient(directory);
+ if (StringUtils.isBlank(fileSystem)) {
+ throw new ProcessException(FILESYSTEM.getDisplayName() + " property evaluated to empty string. " +
+ FILESYSTEM.getDisplayName() + " must be specified as a non-empty string.");
+ }
+ if (StringUtils.isBlank(fileName)) {
+ throw new ProcessException(FILE.getDisplayName() + " property evaluated to empty string. " +
+ FILE.getDisplayName() + " must be specified as a non-empty string.");
+ }
+
+ final DataLakeServiceClient storageClient = getStorageClient(context, flowFile);
+ final DataLakeFileSystemClient fileSystemClient = storageClient.getFileSystemClient(fileSystem);
+ final DataLakeDirectoryClient directoryClient = fileSystemClient.getDirectoryClient(directory);
final DataLakeFileClient fileClient = directoryClient.getFileClient(fileName);
+
fileClient.delete();
session.transfer(flowFile, REL_SUCCESS);
@@ -69,4 +78,4 @@ public class DeleteAzureDataLakeStorage extends AbstractAzureDataLakeStorageProc
session.transfer(flowFile, REL_FAILURE);
}
}
-}
+}
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureDataLakeStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureDataLakeStorage.java
index 2d80ea3..5af7988 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureDataLakeStorage.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureDataLakeStorage.java
@@ -63,8 +63,8 @@ public class FetchAzureDataLakeStorage extends AbstractAzureDataLakeStorageProce
}
final DataLakeServiceClient storageClient = getStorageClient(context, flowFile);
- final DataLakeFileSystemClient dataLakeFileSystemClient = storageClient.getFileSystemClient(fileSystem);
- final DataLakeDirectoryClient directoryClient = dataLakeFileSystemClient.getDirectoryClient(directory);
+ final DataLakeFileSystemClient fileSystemClient = storageClient.getFileSystemClient(fileSystem);
+ final DataLakeDirectoryClient directoryClient = fileSystemClient.getDirectoryClient(directory);
final DataLakeFileClient fileClient = directoryClient.getFileClient(fileName);
if (fileClient.getProperties().isDirectory()) {
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITDeleteAzureDataLakeStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITDeleteAzureDataLakeStorage.java
index 29915eb..a07108b 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITDeleteAzureDataLakeStorage.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITDeleteAzureDataLakeStorage.java
@@ -16,44 +16,388 @@
*/
package org.apache.nifi.processors.azure.storage;
+import com.azure.storage.file.datalake.DataLakeDirectoryClient;
+import com.azure.storage.file.datalake.DataLakeFileClient;
+import com.azure.storage.file.datalake.models.DataLakeStorageException;
import org.apache.nifi.processor.Processor;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.util.MockFlowFile;
-import org.junit.Before;
-import org.junit.Ignore;
import org.junit.Test;
-import java.util.List;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
-public class ITDeleteAzureDataLakeStorage extends AbstractAzureBlobStorageIT {
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class ITDeleteAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT {
@Override
protected Class<? extends Processor> getProcessorClass() {
return DeleteAzureDataLakeStorage.class;
}
- @Before
- public void setUp() {
- runner.setProperty(DeleteAzureDataLakeStorage.FILE, TEST_FILE_NAME);
+ @Test
+ public void testDeleteFileFromRoot() {
+ // GIVEN
+ String directory= "";
+ String filename = "testFile.txt";
+ String fileContent = "AzureFileContent";
+ String inputFlowFileContent = "InputFlowFileContent";
+
+ uploadFile(directory, filename, fileContent);
+
+ // WHEN
+ // THEN
+ testSuccessfulDelete(fileSystemName, directory, filename, inputFlowFileContent, inputFlowFileContent);
}
- @Ignore
@Test
- public void testDeleteFile() throws Exception {
- runner.assertValid();
- runner.enqueue(new byte[0]);
- runner.run(1);
+ public void testDeleteFileFromDirectory() {
+ // GIVEN
+ String directory = "TestDirectory";
+ String filename = "testFile.txt";
+ String fileContent = "AzureFileContent";
+ String inputFlowFileContent = "InputFlowFileContent";
+
+ createDirectoryAndUploadFile(directory, filename, fileContent);
+
+ // WHEN
+ // THEN
+ testSuccessfulDelete(fileSystemName, directory, filename, inputFlowFileContent, inputFlowFileContent);
+ }
+
+ @Test
+ public void testDeleteFileFromDeepDirectory() {
+ // GIVEN
+ String directory= "Directory01/Directory02/Directory03/Directory04/Directory05/Directory06/Directory07/"
+ + "Directory08/Directory09/Directory10/Directory11/Directory12/Directory13/Directory14/Directory15/"
+ + "Directory16/Directory17/Directory18/Directory19/Directory20/TestDirectory";
+ String filename = "testFile.txt";
+ String fileContent = "AzureFileContent";
+ String inputFlowFileContent = "InputFlowFileContent";
+
+ createDirectoryAndUploadFile(directory, filename, fileContent);
+
+ // WHEN
+ // THEN
+ testSuccessfulDelete(fileSystemName, directory, filename, inputFlowFileContent, inputFlowFileContent);
+ }
+
+ @Test
+ public void testDeleteFileWithWhitespaceInFilename() {
+ // GIVEN
+ String directory= "TestDirectory";
+ String filename = "A test file.txt";
+ String fileContent = "AzureFileContent";
+ String inputFlowFileContent = "InputFlowFileContent";
+
+ createDirectoryAndUploadFile(directory, filename, fileContent);
+
+ // WHEN
+ // THEN
+ testSuccessfulDelete(fileSystemName, directory, filename, inputFlowFileContent, inputFlowFileContent);
+ }
+
+ @Test
+ public void testDeleteFileWithWhitespaceInDirectoryName() {
+ // GIVEN
+ String directory= "A Test Directory";
+ String filename = "testFile.txt";
+ String fileContent = "AzureFileContent";
+ String inputFlowFileContent = "InputFlowFileContent";
+
+ createDirectoryAndUploadFile(directory, filename, fileContent);
+
+ // WHEN
+ // THEN
+ testSuccessfulDelete(fileSystemName, directory, filename, inputFlowFileContent, inputFlowFileContent);
+ }
+
+ @Test
+ public void testDeleteEmptyDirectory() {
+ // GIVEN
+ String parentDirectory = "ParentDirectory";
+ String childDirectory = "ChildDirectory";
+ String inputFlowFileContent = "InputFlowFileContent";
+
+ fileSystemClient.createDirectory(parentDirectory + "/" + childDirectory);
+
+ // WHEN
+ // THEN
+ testSuccessfulDelete(fileSystemName, parentDirectory, childDirectory, inputFlowFileContent, inputFlowFileContent);
+ }
+
+ @Test
+ public void testDeleteFileCaseSensitiveFilename() {
+ // GIVEN
+ String directory = "TestDirectory";
+ String filename1 = "testFile.txt";
+ String filename2 = "testfile.txt";
+ String fileContent1 = "ContentOfFile1";
+ String fileContent2 = "ContentOfFile2";
+ String inputFlowFileContent = "InputFlowFileContent";
+
+ createDirectoryAndUploadFile(directory, filename1, fileContent1);
+ uploadFile(directory, filename2, fileContent2);
+
+ // WHEN
+ // THEN
+ testSuccessfulDelete(fileSystemName, directory, filename1, inputFlowFileContent, inputFlowFileContent);
+ assertTrue(fileExists(directory, filename2));
+ }
+
+ @Test
+ public void testDeleteUsingExpressionLanguage() {
+ // GIVEN
+ String expLangFileSystem = "az.filesystem";
+ String expLangDirectory = "az.directory";
+ String expLangFilename = "az.filename";
+
+ String directory = "TestDirectory";
+ String filename = "testFile.txt";
+ String fileContent = "AzureFileContent";
+
+ String inputFlowFileContent = "InputFlowFileContent";
+
+ Map<String, String> attributes = new HashMap<>();
+ attributes.put(expLangFileSystem, fileSystemName);
+ attributes.put(expLangDirectory, directory);
+ attributes.put(expLangFilename, filename);
+
+ createDirectoryAndUploadFile(directory, filename, fileContent);
+
+ // WHEN
+ // THEN
+ testSuccessfulDeleteWithExpressionLanguage("${" + expLangFileSystem + "}",
+ "${" + expLangDirectory + "}",
+ "${" + expLangFilename + "}",
+ attributes,
+ inputFlowFileContent,
+ inputFlowFileContent,
+ directory,
+ filename);
+ }
+
+ @Test
+ public void testDeleteUsingExpressionLanguageFileSystemIsNotSpecified() {
+ // GIVEN
+ String expLangFileSystem = "az.filesystem";
+ String expLangDirectory = "az.directory";
+ String expLangFilename = "az.filename";
+
+ String directory = "TestDirectory";
+ String filename = "testFile.txt";
+ String fileContent = "AzureFileContent";
+
+ String inputFlowFileContent = "InputFlowFileContent";
+
+ Map<String, String> attributes = new HashMap<>();
+ attributes.put(expLangDirectory, directory);
+ attributes.put(expLangFilename, filename);
+
+ createDirectoryAndUploadFile(directory, filename, fileContent);
+
+ // WHEN
+ // THEN
+ testFailedDeleteWithProcessException("${" + expLangFileSystem + "}",
+ "${" + expLangDirectory + "}",
+ "${" + expLangFilename + "}",
+ attributes,
+ inputFlowFileContent,
+ inputFlowFileContent);
+ assertTrue(fileExists(directory, filename));
+ }
+
+ @Test
+ public void testDeleteUsingExpressionLanguageFilenameIsNotSpecified() {
+ // GIVEN
+ String expLangFileSystem = "az.filesystem";
+ String expLangDirectory = "az.directory";
+ String expLangFilename = "az.filename";
+
+ String directory = "TestDirectory";
+ String filename = "testFile.txt";
+ String fileContent = "AzureFileContent";
+
+ String inputFlowFileContent = "InputFlowFileContent";
+ Map<String, String> attributes = new HashMap<>();
+ attributes.put(expLangFileSystem, fileSystemName);
+ attributes.put(expLangDirectory, directory);
- assertResult();
+ createDirectoryAndUploadFile(directory, filename, fileContent);
+
+ // WHEN
+ // THEN
+ testFailedDeleteWithProcessException("${" + expLangFileSystem + "}",
+ "${" + expLangDirectory + "}",
+ "${" + expLangFilename + "}",
+ attributes,
+ inputFlowFileContent,
+ inputFlowFileContent);
+ assertTrue(fileExists(directory, filename));
+ }
+
+ @Test
+ public void testDeleteNonExistentFile() {
+ // GIVEN
+ String directory = "TestDirectory";
+ String filename = "testFile.txt";
+ String inputFlowFileContent = "InputFlowFileContent";
+
+ fileSystemClient.createDirectory(directory);
+
+ // WHEN
+ // THEN
+ testFailedDelete(fileSystemName, directory, filename, inputFlowFileContent, inputFlowFileContent, 404);
+ assertTrue(fileExists("", directory));
+ }
+
+ @Test
+ public void testDeleteFileFromNonExistentDirectory() {
+ // GIVEN
+ String directory = "TestDirectory";
+ String filename = "testFile.txt";
+ String inputFlowFileContent = "InputFlowFileContent";
+
+ // WHEN
+ // THEN
+ testFailedDelete(fileSystemName, directory, filename, inputFlowFileContent, inputFlowFileContent, 404);
+ }
+
+ @Test
+ public void testDeleteFileFromNonExistentFileSystem() {
+ // GIVEN
+ String fileSystem = "NonExistentFileSystem";
+ String directory = "TestDirectory";
+ String filename = "testFile.txt";
+ String inputFlowFileContent = "InputFlowFileContent";
+
+ // WHEN
+ // THEN
+ testFailedDelete(fileSystem, directory, filename, inputFlowFileContent, inputFlowFileContent, 400);
}
+ @Test
+ public void testDeleteNonEmptyDirectory() {
+ // GIVEN
+ String directory = "TestDirectory";
+ String filename = "testFile.txt";
+ String fileContent = "AzureFileContent";
+ String inputFlowFileContent = "InputFlowFileContent";
+
+ createDirectoryAndUploadFile(directory, filename, fileContent);
+
+ testFailedDelete(fileSystemName, "", directory, inputFlowFileContent, inputFlowFileContent, 409);
+ assertTrue(fileExists(directory, filename));
+ }
+
+ @Test
+ public void testDeleteFileWithInvalidFilename() {
+ // GIVEN
+ String directory = "TestDirectory";
+ String filename = "testFile.txt";
+ String invalidFilename = "test/\\File.txt";
+ String fileContent = "AzureFileContent";
+ String inputFlowFileContent = "InputFlowFileContent";
+
+ createDirectoryAndUploadFile(directory, filename, fileContent);
+
+ // WHEN
+ // THEN
+ testFailedDelete(fileSystemName, directory, invalidFilename, inputFlowFileContent, inputFlowFileContent, 400);
+ assertTrue(fileExists(directory, filename));
+ }
+
+ private void testSuccessfulDelete(String fileSystem, String directory, String filename, String inputFlowFileContent, String expectedFlowFileContent) {
+ testSuccessfulDeleteWithExpressionLanguage(fileSystem, directory, filename, Collections.emptyMap(), inputFlowFileContent, expectedFlowFileContent,
+ directory, filename);
+ }
+
+ private void testSuccessfulDeleteWithExpressionLanguage(String expLangFileSystem, String expLangDirectory, String expLangFilename, Map<String, String> attributes,
+ String inputFlowFileContent, String expectedFlowFileContent, String directory, String filename) {
+ // GIVEN
+ int expectedNumberOfProvenanceEvents = 1;
+ ProvenanceEventType expectedEventType = ProvenanceEventType.REMOTE_INVOCATION;
+
+ setRunnerProperties(expLangFileSystem, expLangDirectory, expLangFilename);
- private void assertResult() throws Exception {
+ // WHEN
+ startRunner(inputFlowFileContent, attributes);
+
+ // THEN
+ assertSuccess(directory, filename, expectedFlowFileContent, expectedNumberOfProvenanceEvents, expectedEventType);
+ }
+
+ private void testFailedDelete(String fileSystem, String directory, String filename, String inputFlowFileContent, String expectedFlowFileContent, int expectedErrorCode) {
+ // GIVEN
+ setRunnerProperties(fileSystem, directory, filename);
+
+ // WHEN
+ startRunner(inputFlowFileContent, Collections.emptyMap());
+
+ // THEN
+ DataLakeStorageException e = (DataLakeStorageException)runner.getLogger().getErrorMessages().get(0).getThrowable();
+ assertEquals(expectedErrorCode, e.getStatusCode());
+
+ assertFailure(expectedFlowFileContent);
+ }
+
+ private void testFailedDeleteWithProcessException(String fileSystem, String directory, String filename, Map<String, String> attributes,
+ String inputFlowFileContent, String expectedFlowFileContent) {
+ // GIVEN
+ setRunnerProperties(fileSystem, directory, filename);
+
+ // WHEN
+ startRunner(inputFlowFileContent, attributes);
+
+ // THEN
+ Throwable exception = runner.getLogger().getErrorMessages().get(0).getThrowable();
+ assertEquals(ProcessException.class, exception.getClass());
+
+ assertFailure(expectedFlowFileContent);
+ }
+
+ private boolean fileExists(String directory, String filename) {
+ DataLakeDirectoryClient directoryClient = fileSystemClient.getDirectoryClient(directory);
+ DataLakeFileClient fileClient = directoryClient.getFileClient(filename);
+
+ return fileClient.exists();
+ }
+
+ private void setRunnerProperties(String fileSystem, String directory, String filename) {
+ runner.setProperty(DeleteAzureDataLakeStorage.FILESYSTEM, fileSystem);
+ runner.setProperty(DeleteAzureDataLakeStorage.DIRECTORY, directory);
+ runner.setProperty(DeleteAzureDataLakeStorage.FILE, filename);
+ runner.assertValid();
+ }
+
+ private void startRunner(String inputFlowFileContent, Map<String, String> attributes) {
+ runner.enqueue(inputFlowFileContent, attributes);
+ runner.run();
+ }
+
+ private void assertSuccess(String directory, String filename, String expectedFlowFileContent, int expectedNumberOfProvenanceEvents, ProvenanceEventType expectedEventType) {
runner.assertAllFlowFilesTransferred(DeleteAzureDataLakeStorage.REL_SUCCESS, 1);
- List<MockFlowFile> flowFilesForRelationship = runner.getFlowFilesForRelationship(DeleteAzureDataLakeStorage.REL_SUCCESS);
- for (MockFlowFile flowFile : flowFilesForRelationship) {
- flowFile.assertContentEquals("0123456789".getBytes());
- flowFile.assertAttributeEquals("azure.length", "10");
+ MockFlowFile flowFile = runner.getFlowFilesForRelationship(DeleteAzureDataLakeStorage.REL_SUCCESS).get(0);
+ flowFile.assertContentEquals(expectedFlowFileContent);
+
+ int actualNumberOfProvenanceEvents = runner.getProvenanceEvents().size();
+ assertEquals(expectedNumberOfProvenanceEvents, actualNumberOfProvenanceEvents);
+
+ ProvenanceEventType actualEventType = runner.getProvenanceEvents().get(0).getEventType();
+ assertEquals(expectedEventType, actualEventType);
- }
+ assertFalse(fileExists(directory, filename));
}
+
+ private void assertFailure(String expectedFlowFileContent) {
+ runner.assertAllFlowFilesTransferred(DeleteAzureDataLakeStorage.REL_FAILURE, 1);
+ MockFlowFile flowFile = runner.getFlowFilesForRelationship(DeleteAzureDataLakeStorage.REL_FAILURE).get(0);
+ flowFile.assertContentEquals(expectedFlowFileContent);
+ }
+
}
\ No newline at end of file