You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by tu...@apache.org on 2020/05/19 16:37:29 UTC

[nifi] branch master updated: NIFI-7336: Add tests for DeleteAzureDataLakeStorage

This is an automated email from the ASF dual-hosted git repository.

turcsanyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/master by this push:
     new d6240a1  NIFI-7336: Add tests for DeleteAzureDataLakeStorage
d6240a1 is described below

commit d6240a1074aa33d650cb91dec3e69b7d5e3da9b5
Author: Peter Gyori <pe...@gmail.com>
AuthorDate: Thu May 7 16:44:25 2020 +0200

    NIFI-7336: Add tests for DeleteAzureDataLakeStorage
    
    DeleteAzureDataLakeStorage now throws exception if fileSystem or fileName is empty string
    
    NIFI-7336: Add tests for DeleteAzureDataLakeStorage - typos fixed
    NIFI-7336: Add tests for DeleteAzureDataLakeStorage - fixed a test case
    
    This closes #4272.
    
    Signed-off-by: Peter Turcsanyi <tu...@apache.org>
---
 .../azure/storage/DeleteAzureDataLakeStorage.java  |  29 +-
 .../azure/storage/FetchAzureDataLakeStorage.java   |   4 +-
 .../storage/ITDeleteAzureDataLakeStorage.java      | 382 ++++++++++++++++++++-
 3 files changed, 384 insertions(+), 31 deletions(-)

diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/DeleteAzureDataLakeStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/DeleteAzureDataLakeStorage.java
index 86b23aa..314785a 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/DeleteAzureDataLakeStorage.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/DeleteAzureDataLakeStorage.java
@@ -16,6 +16,11 @@
  */
 package org.apache.nifi.processors.azure.storage;
 
+import com.azure.storage.file.datalake.DataLakeDirectoryClient;
+import com.azure.storage.file.datalake.DataLakeFileClient;
+import com.azure.storage.file.datalake.DataLakeFileSystemClient;
+import com.azure.storage.file.datalake.DataLakeServiceClient;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
@@ -27,11 +32,6 @@ import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor;
 
-import com.azure.storage.file.datalake.DataLakeDirectoryClient;
-import com.azure.storage.file.datalake.DataLakeFileClient;
-import com.azure.storage.file.datalake.DataLakeFileSystemClient;
-import com.azure.storage.file.datalake.DataLakeServiceClient;
-
 import java.util.concurrent.TimeUnit;
 
 @Tags({"azure", "microsoft", "cloud", "storage", "adlsgen2", "datalake"})
@@ -43,7 +43,6 @@ public class DeleteAzureDataLakeStorage extends AbstractAzureDataLakeStorageProc
     @Override
     public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
         FlowFile flowFile = session.get();
-
         if (flowFile == null) {
             return;
         }
@@ -53,11 +52,21 @@ public class DeleteAzureDataLakeStorage extends AbstractAzureDataLakeStorageProc
             final String fileSystem = context.getProperty(FILESYSTEM).evaluateAttributeExpressions(flowFile).getValue();
             final String directory = context.getProperty(DIRECTORY).evaluateAttributeExpressions(flowFile).getValue();
             final String fileName = context.getProperty(FILE).evaluateAttributeExpressions(flowFile).getValue();
-            final DataLakeServiceClient storageClient = getStorageClient(context, flowFile);
-            final DataLakeFileSystemClient dataLakeFileSystemClient = storageClient.getFileSystemClient(fileSystem);
-            final DataLakeDirectoryClient directoryClient = dataLakeFileSystemClient.getDirectoryClient(directory);
 
+            if (StringUtils.isBlank(fileSystem)) {
+                throw new ProcessException(FILESYSTEM.getDisplayName() + " property evaluated to empty string. " +
+                        FILESYSTEM.getDisplayName() + " must be specified as a non-empty string.");
+            }
+            if (StringUtils.isBlank(fileName)) {
+                throw new ProcessException(FILE.getDisplayName() + " property evaluated to empty string. " +
+                        FILE.getDisplayName() + " must be specified as a non-empty string.");
+            }
+
+            final DataLakeServiceClient storageClient = getStorageClient(context, flowFile);
+            final DataLakeFileSystemClient fileSystemClient = storageClient.getFileSystemClient(fileSystem);
+            final DataLakeDirectoryClient directoryClient = fileSystemClient.getDirectoryClient(directory);
             final DataLakeFileClient fileClient = directoryClient.getFileClient(fileName);
+
             fileClient.delete();
             session.transfer(flowFile, REL_SUCCESS);
 
@@ -69,4 +78,4 @@ public class DeleteAzureDataLakeStorage extends AbstractAzureDataLakeStorageProc
             session.transfer(flowFile, REL_FAILURE);
         }
     }
-}
+}
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureDataLakeStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureDataLakeStorage.java
index 2d80ea3..5af7988 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureDataLakeStorage.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureDataLakeStorage.java
@@ -63,8 +63,8 @@ public class FetchAzureDataLakeStorage extends AbstractAzureDataLakeStorageProce
             }
 
             final DataLakeServiceClient storageClient = getStorageClient(context, flowFile);
-            final DataLakeFileSystemClient dataLakeFileSystemClient = storageClient.getFileSystemClient(fileSystem);
-            final DataLakeDirectoryClient directoryClient = dataLakeFileSystemClient.getDirectoryClient(directory);
+            final DataLakeFileSystemClient fileSystemClient = storageClient.getFileSystemClient(fileSystem);
+            final DataLakeDirectoryClient directoryClient = fileSystemClient.getDirectoryClient(directory);
             final DataLakeFileClient fileClient = directoryClient.getFileClient(fileName);
 
             if (fileClient.getProperties().isDirectory()) {
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITDeleteAzureDataLakeStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITDeleteAzureDataLakeStorage.java
index 29915eb..a07108b 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITDeleteAzureDataLakeStorage.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITDeleteAzureDataLakeStorage.java
@@ -16,44 +16,388 @@
  */
 package org.apache.nifi.processors.azure.storage;
 
+import com.azure.storage.file.datalake.DataLakeDirectoryClient;
+import com.azure.storage.file.datalake.DataLakeFileClient;
+import com.azure.storage.file.datalake.models.DataLakeStorageException;
 import org.apache.nifi.processor.Processor;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.provenance.ProvenanceEventType;
 import org.apache.nifi.util.MockFlowFile;
-import org.junit.Before;
-import org.junit.Ignore;
 import org.junit.Test;
 
-import java.util.List;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
 
-public class ITDeleteAzureDataLakeStorage extends AbstractAzureBlobStorageIT {
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class ITDeleteAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT {
 
     @Override
     protected Class<? extends Processor> getProcessorClass() {
         return DeleteAzureDataLakeStorage.class;
     }
 
-    @Before
-    public void setUp() {
-        runner.setProperty(DeleteAzureDataLakeStorage.FILE, TEST_FILE_NAME);
+    @Test
+    public void testDeleteFileFromRoot() {
+        // GIVEN
+        String directory= "";
+        String filename = "testFile.txt";
+        String fileContent = "AzureFileContent";
+        String inputFlowFileContent = "InputFlowFileContent";
+
+        uploadFile(directory, filename, fileContent);
+
+        // WHEN
+        // THEN
+        testSuccessfulDelete(fileSystemName, directory, filename, inputFlowFileContent, inputFlowFileContent);
     }
 
-    @Ignore
     @Test
-    public void testDeleteFile() throws Exception {
-        runner.assertValid();
-        runner.enqueue(new byte[0]);
-        runner.run(1);
+    public void testDeleteFileFromDirectory() {
+        // GIVEN
+        String directory = "TestDirectory";
+        String filename = "testFile.txt";
+        String fileContent = "AzureFileContent";
+        String inputFlowFileContent = "InputFlowFileContent";
+
+        createDirectoryAndUploadFile(directory, filename, fileContent);
+
+        // WHEN
+        // THEN
+        testSuccessfulDelete(fileSystemName, directory, filename, inputFlowFileContent, inputFlowFileContent);
+    }
+
+    @Test
+    public void testDeleteFileFromDeepDirectory() {
+        // GIVEN
+        String directory= "Directory01/Directory02/Directory03/Directory04/Directory05/Directory06/Directory07/"
+                + "Directory08/Directory09/Directory10/Directory11/Directory12/Directory13/Directory14/Directory15/"
+                + "Directory16/Directory17/Directory18/Directory19/Directory20/TestDirectory";
+        String filename = "testFile.txt";
+        String fileContent = "AzureFileContent";
+        String inputFlowFileContent = "InputFlowFileContent";
+
+        createDirectoryAndUploadFile(directory, filename, fileContent);
+
+        // WHEN
+        // THEN
+        testSuccessfulDelete(fileSystemName, directory, filename, inputFlowFileContent, inputFlowFileContent);
+    }
+
+    @Test
+    public void testDeleteFileWithWhitespaceInFilename() {
+        // GIVEN
+        String directory= "TestDirectory";
+        String filename = "A test file.txt";
+        String fileContent = "AzureFileContent";
+        String inputFlowFileContent = "InputFlowFileContent";
+
+        createDirectoryAndUploadFile(directory, filename, fileContent);
+
+        // WHEN
+        // THEN
+        testSuccessfulDelete(fileSystemName, directory, filename, inputFlowFileContent, inputFlowFileContent);
+    }
+
+    @Test
+    public void testDeleteFileWithWhitespaceInDirectoryName() {
+        // GIVEN
+        String directory= "A Test Directory";
+        String filename = "testFile.txt";
+        String fileContent = "AzureFileContent";
+        String inputFlowFileContent = "InputFlowFileContent";
+
+        createDirectoryAndUploadFile(directory, filename, fileContent);
+
+        // WHEN
+        // THEN
+        testSuccessfulDelete(fileSystemName, directory, filename, inputFlowFileContent, inputFlowFileContent);
+    }
+
+    @Test
+    public void testDeleteEmptyDirectory() {
+        // GIVEN
+        String parentDirectory = "ParentDirectory";
+        String childDirectory = "ChildDirectory";
+        String inputFlowFileContent = "InputFlowFileContent";
+
+        fileSystemClient.createDirectory(parentDirectory + "/" + childDirectory);
+
+        // WHEN
+        // THEN
+        testSuccessfulDelete(fileSystemName, parentDirectory, childDirectory, inputFlowFileContent, inputFlowFileContent);
+    }
+
+    @Test
+    public void testDeleteFileCaseSensitiveFilename() {
+        // GIVEN
+        String directory = "TestDirectory";
+        String filename1 = "testFile.txt";
+        String filename2 = "testfile.txt";
+        String fileContent1 = "ContentOfFile1";
+        String fileContent2 = "ContentOfFile2";
+        String inputFlowFileContent = "InputFlowFileContent";
+
+        createDirectoryAndUploadFile(directory, filename1, fileContent1);
+        uploadFile(directory, filename2, fileContent2);
+
+        // WHEN
+        // THEN
+        testSuccessfulDelete(fileSystemName, directory, filename1, inputFlowFileContent, inputFlowFileContent);
+        assertTrue(fileExists(directory, filename2));
+    }
+
+    @Test
+    public void testDeleteUsingExpressionLanguage() {
+        // GIVEN
+        String expLangFileSystem = "az.filesystem";
+        String expLangDirectory = "az.directory";
+        String expLangFilename = "az.filename";
+
+        String directory = "TestDirectory";
+        String filename = "testFile.txt";
+        String fileContent = "AzureFileContent";
+
+        String inputFlowFileContent = "InputFlowFileContent";
+
+        Map<String, String> attributes = new HashMap<>();
+        attributes.put(expLangFileSystem, fileSystemName);
+        attributes.put(expLangDirectory, directory);
+        attributes.put(expLangFilename, filename);
+
+        createDirectoryAndUploadFile(directory, filename, fileContent);
+
+        // WHEN
+        // THEN
+        testSuccessfulDeleteWithExpressionLanguage("${" + expLangFileSystem + "}",
+                "${" + expLangDirectory + "}",
+                "${" + expLangFilename + "}",
+                attributes,
+                inputFlowFileContent,
+                inputFlowFileContent,
+                directory,
+                filename);
+    }
+
+    @Test
+    public void testDeleteUsingExpressionLanguageFileSystemIsNotSpecified() {
+        // GIVEN
+        String expLangFileSystem = "az.filesystem";
+        String expLangDirectory = "az.directory";
+        String expLangFilename = "az.filename";
+
+        String directory = "TestDirectory";
+        String filename = "testFile.txt";
+        String fileContent = "AzureFileContent";
+
+        String inputFlowFileContent = "InputFlowFileContent";
+
+        Map<String, String> attributes = new HashMap<>();
+        attributes.put(expLangDirectory, directory);
+        attributes.put(expLangFilename, filename);
+
+        createDirectoryAndUploadFile(directory, filename, fileContent);
+
+        // WHEN
+        // THEN
+        testFailedDeleteWithProcessException("${" + expLangFileSystem + "}",
+                "${" + expLangDirectory + "}",
+                "${" + expLangFilename + "}",
+                attributes,
+                inputFlowFileContent,
+                inputFlowFileContent);
+        assertTrue(fileExists(directory, filename));
+    }
+
+    @Test
+    public void testDeleteUsingExpressionLanguageFilenameIsNotSpecified() {
+        // GIVEN
+        String expLangFileSystem = "az.filesystem";
+        String expLangDirectory = "az.directory";
+        String expLangFilename = "az.filename";
+
+        String directory = "TestDirectory";
+        String filename = "testFile.txt";
+        String fileContent = "AzureFileContent";
+
+        String inputFlowFileContent = "InputFlowFileContent";
+        Map<String, String> attributes = new HashMap<>();
+        attributes.put(expLangFileSystem, fileSystemName);
+        attributes.put(expLangDirectory, directory);
 
-        assertResult();
+        createDirectoryAndUploadFile(directory, filename, fileContent);
+
+        // WHEN
+        // THEN
+        testFailedDeleteWithProcessException("${" + expLangFileSystem + "}",
+                "${" + expLangDirectory + "}",
+                "${" + expLangFilename + "}",
+                attributes,
+                inputFlowFileContent,
+                inputFlowFileContent);
+        assertTrue(fileExists(directory, filename));
+    }
+
+    @Test
+    public void testDeleteNonExistentFile() {
+        // GIVEN
+        String directory = "TestDirectory";
+        String filename = "testFile.txt";
+        String inputFlowFileContent = "InputFlowFileContent";
+
+        fileSystemClient.createDirectory(directory);
+
+        // WHEN
+        // THEN
+        testFailedDelete(fileSystemName, directory, filename, inputFlowFileContent, inputFlowFileContent, 404);
+        assertTrue(fileExists("", directory));
+    }
+
+    @Test
+    public void testDeleteFileFromNonExistentDirectory() {
+        // GIVEN
+        String directory = "TestDirectory";
+        String filename = "testFile.txt";
+        String inputFlowFileContent = "InputFlowFileContent";
+
+        // WHEN
+        // THEN
+        testFailedDelete(fileSystemName, directory, filename, inputFlowFileContent, inputFlowFileContent, 404);
+    }
+
+    @Test
+    public void testDeleteFileFromNonExistentFileSystem() {
+        // GIVEN
+        String fileSystem = "NonExistentFileSystem";
+        String directory = "TestDirectory";
+        String filename = "testFile.txt";
+        String inputFlowFileContent = "InputFlowFileContent";
+
+        // WHEN
+        // THEN
+        testFailedDelete(fileSystem, directory, filename, inputFlowFileContent, inputFlowFileContent, 400);
     }
 
+    @Test
+    public void testDeleteNonEmptyDirectory() {
+        // GIVEN
+        String directory = "TestDirectory";
+        String filename = "testFile.txt";
+        String fileContent = "AzureFileContent";
+        String inputFlowFileContent = "InputFlowFileContent";
+
+        createDirectoryAndUploadFile(directory, filename, fileContent);
+
+        testFailedDelete(fileSystemName, "", directory, inputFlowFileContent, inputFlowFileContent, 409);
+        assertTrue(fileExists(directory, filename));
+    }
+
+    @Test
+    public void testDeleteFileWithInvalidFilename() {
+        // GIVEN
+        String directory = "TestDirectory";
+        String filename = "testFile.txt";
+        String invalidFilename = "test/\\File.txt";
+        String fileContent = "AzureFileContent";
+        String inputFlowFileContent = "InputFlowFileContent";
+
+        createDirectoryAndUploadFile(directory, filename, fileContent);
+
+        // WHEN
+        // THEN
+        testFailedDelete(fileSystemName, directory, invalidFilename, inputFlowFileContent, inputFlowFileContent, 400);
+        assertTrue(fileExists(directory, filename));
+    }
+
+    private void testSuccessfulDelete(String fileSystem, String directory, String filename, String inputFlowFileContent, String expectedFlowFileContent) {
+        testSuccessfulDeleteWithExpressionLanguage(fileSystem, directory, filename, Collections.emptyMap(), inputFlowFileContent, expectedFlowFileContent,
+                                                   directory, filename);
+    }
+
+    private void testSuccessfulDeleteWithExpressionLanguage(String expLangFileSystem, String expLangDirectory, String expLangFilename, Map<String, String> attributes,
+                                                            String inputFlowFileContent, String expectedFlowFileContent, String directory, String filename) {
+        // GIVEN
+        int expectedNumberOfProvenanceEvents = 1;
+        ProvenanceEventType expectedEventType = ProvenanceEventType.REMOTE_INVOCATION;
+
+        setRunnerProperties(expLangFileSystem, expLangDirectory, expLangFilename);
 
-    private void assertResult() throws Exception {
+        // WHEN
+        startRunner(inputFlowFileContent, attributes);
+
+        // THEN
+        assertSuccess(directory, filename, expectedFlowFileContent, expectedNumberOfProvenanceEvents, expectedEventType);
+    }
+
+    private void testFailedDelete(String fileSystem, String directory, String filename, String inputFlowFileContent, String expectedFlowFileContent, int expectedErrorCode) {
+        // GIVEN
+        setRunnerProperties(fileSystem, directory, filename);
+
+        // WHEN
+        startRunner(inputFlowFileContent, Collections.emptyMap());
+
+        // THEN
+        DataLakeStorageException e = (DataLakeStorageException)runner.getLogger().getErrorMessages().get(0).getThrowable();
+        assertEquals(expectedErrorCode, e.getStatusCode());
+
+        assertFailure(expectedFlowFileContent);
+    }
+
+    private void testFailedDeleteWithProcessException(String fileSystem, String directory, String filename, Map<String, String> attributes,
+                                                      String inputFlowFileContent, String expectedFlowFileContent) {
+        // GIVEN
+        setRunnerProperties(fileSystem, directory, filename);
+
+        // WHEN
+        startRunner(inputFlowFileContent, attributes);
+
+        // THEN
+        Throwable exception = runner.getLogger().getErrorMessages().get(0).getThrowable();
+        assertEquals(ProcessException.class, exception.getClass());
+
+        assertFailure(expectedFlowFileContent);
+    }
+
+    private boolean fileExists(String directory, String filename) {
+        DataLakeDirectoryClient directoryClient = fileSystemClient.getDirectoryClient(directory);
+        DataLakeFileClient fileClient = directoryClient.getFileClient(filename);
+
+        return fileClient.exists();
+    }
+
+    private void setRunnerProperties(String fileSystem, String directory, String filename) {
+        runner.setProperty(DeleteAzureDataLakeStorage.FILESYSTEM, fileSystem);
+        runner.setProperty(DeleteAzureDataLakeStorage.DIRECTORY, directory);
+        runner.setProperty(DeleteAzureDataLakeStorage.FILE, filename);
+        runner.assertValid();
+    }
+
+    private void startRunner(String inputFlowFileContent, Map<String, String> attributes) {
+        runner.enqueue(inputFlowFileContent, attributes);
+        runner.run();
+    }
+
+    private void assertSuccess(String directory, String filename, String expectedFlowFileContent, int expectedNumberOfProvenanceEvents, ProvenanceEventType expectedEventType) {
         runner.assertAllFlowFilesTransferred(DeleteAzureDataLakeStorage.REL_SUCCESS, 1);
-        List<MockFlowFile> flowFilesForRelationship = runner.getFlowFilesForRelationship(DeleteAzureDataLakeStorage.REL_SUCCESS);
-        for (MockFlowFile flowFile : flowFilesForRelationship) {
-            flowFile.assertContentEquals("0123456789".getBytes());
-            flowFile.assertAttributeEquals("azure.length", "10");
+        MockFlowFile flowFile = runner.getFlowFilesForRelationship(DeleteAzureDataLakeStorage.REL_SUCCESS).get(0);
+        flowFile.assertContentEquals(expectedFlowFileContent);
+
+        int actualNumberOfProvenanceEvents = runner.getProvenanceEvents().size();
+        assertEquals(expectedNumberOfProvenanceEvents, actualNumberOfProvenanceEvents);
+
+        ProvenanceEventType actualEventType = runner.getProvenanceEvents().get(0).getEventType();
+        assertEquals(expectedEventType, actualEventType);
 
-        }
+        assertFalse(fileExists(directory, filename));
     }
+
+    private void assertFailure(String expectedFlowFileContent) {
+        runner.assertAllFlowFilesTransferred(DeleteAzureDataLakeStorage.REL_FAILURE, 1);
+        MockFlowFile flowFile = runner.getFlowFilesForRelationship(DeleteAzureDataLakeStorage.REL_FAILURE).get(0);
+        flowFile.assertContentEquals(expectedFlowFileContent);
+    }
+
 }
\ No newline at end of file