You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by tu...@apache.org on 2021/08/05 11:23:10 UTC

[nifi] branch main updated: NIFI-7947: Add directory deletion functionality in DeleteAzureDataLakeStorage

This is an automated email from the ASF dual-hosted git repository.

turcsanyi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 9bf3b55  NIFI-7947: Add directory deletion functionality in DeleteAzureDataLakeStorage
9bf3b55 is described below

commit 9bf3b5503434e8a1dad5453b54a38dfaf079467c
Author: Lehel Boér <Le...@hotmail.com>
AuthorDate: Thu Jun 24 14:00:23 2021 +0200

    NIFI-7947: Add directory deletion functionality in DeleteAzureDataLakeStorage
    
    This closes #5190.
    
    Signed-off-by: Peter Turcsanyi <tu...@apache.org>
---
 .../AbstractAzureDataLakeStorageProcessor.java     | 17 ++--
 .../azure/storage/DeleteAzureDataLakeStorage.java  | 76 ++++++++++++++---
 .../storage/ITDeleteAzureDataLakeStorage.java      | 99 +++++++++++++++++++---
 3 files changed, 160 insertions(+), 32 deletions(-)

diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureDataLakeStorageProcessor.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureDataLakeStorageProcessor.java
index e6ca356..fbbcf25 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureDataLakeStorageProcessor.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureDataLakeStorageProcessor.java
@@ -16,13 +16,6 @@
  */
 package org.apache.nifi.processors.azure;
 
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
 import com.azure.core.credential.AccessToken;
 import com.azure.core.credential.TokenCredential;
 import com.azure.identity.ClientSecretCredential;
@@ -32,7 +25,6 @@ import com.azure.identity.ManagedIdentityCredentialBuilder;
 import com.azure.storage.common.StorageSharedKeyCredential;
 import com.azure.storage.file.datalake.DataLakeServiceClient;
 import com.azure.storage.file.datalake.DataLakeServiceClientBuilder;
-
 import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.ValidationContext;
@@ -50,6 +42,13 @@ import org.apache.nifi.services.azure.storage.ADLSCredentialsDetails;
 import org.apache.nifi.services.azure.storage.ADLSCredentialsService;
 import reactor.core.publisher.Mono;
 
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
 import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_FILENAME;
 
 public abstract class AbstractAzureDataLakeStorageProcessor extends AbstractProcessor {
@@ -134,7 +133,7 @@ public abstract class AbstractAzureDataLakeStorageProcessor extends AbstractProc
         final String servicePrincipalClientId = credentialsDetails.getServicePrincipalClientId();
         final String servicePrincipalClientSecret = credentialsDetails.getServicePrincipalClientSecret();
 
-        final String endpoint = String.format("https://%s.%s", accountName,endpointSuffix);
+        final String endpoint = String.format("https://%s.%s", accountName, endpointSuffix);
 
         final DataLakeServiceClient storageClient;
         if (StringUtils.isNotBlank(accountKey)) {
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/DeleteAzureDataLakeStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/DeleteAzureDataLakeStorage.java
index 0e5128e..b929971 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/DeleteAzureDataLakeStorage.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/DeleteAzureDataLakeStorage.java
@@ -16,22 +16,33 @@
  */
 package org.apache.nifi.processors.azure.storage;
 
+import com.azure.core.util.Context;
 import com.azure.storage.file.datalake.DataLakeDirectoryClient;
 import com.azure.storage.file.datalake.DataLakeFileClient;
 import com.azure.storage.file.datalake.DataLakeFileSystemClient;
 import com.azure.storage.file.datalake.DataLakeServiceClient;
+import com.azure.storage.file.datalake.models.DataLakeRequestConditions;
 import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.SeeAlso;
 import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor;
 
-import java.util.concurrent.TimeUnit;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_FILENAME;
 
 @Tags({"azure", "microsoft", "cloud", "storage", "adlsgen2", "datalake"})
 @SeeAlso({PutAzureDataLakeStorage.class, FetchAzureDataLakeStorage.class, ListAzureDataLakeStorage.class})
@@ -39,33 +50,72 @@ import java.util.concurrent.TimeUnit;
 @InputRequirement(Requirement.INPUT_REQUIRED)
 public class DeleteAzureDataLakeStorage extends AbstractAzureDataLakeStorageProcessor {
 
+    public static final AllowableValue FS_TYPE_FILE = new AllowableValue("file", "File", "The object to be deleted is a file.");
+    public static final AllowableValue FS_TYPE_DIRECTORY = new AllowableValue("directory", "Directory", "The object to be deleted is a directory.");
+
+    public static final PropertyDescriptor FILESYSTEM_OBJECT_TYPE = new PropertyDescriptor.Builder()
+            .name("filesystem-object-type")
+            .displayName("Filesystem Object Type")
+            .description("They type of the file system object to be deleted. It can be either folder or file.")
+            .allowableValues(FS_TYPE_FILE, FS_TYPE_DIRECTORY)
+            .required(true)
+            .defaultValue(FS_TYPE_FILE.toString())
+            .build();
+
+    public static final PropertyDescriptor FILE = new PropertyDescriptor.Builder()
+            .name("file-name").displayName("File Name")
+            .description("The filename")
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .required(true)
+            .defaultValue(String.format("${%s}", ATTR_NAME_FILENAME))
+            .dependsOn(FILESYSTEM_OBJECT_TYPE, FS_TYPE_FILE)
+            .build();
+
+    private static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList(
+            ADLS_CREDENTIALS_SERVICE,
+            FILESYSTEM,
+            FILESYSTEM_OBJECT_TYPE,
+            DIRECTORY,
+            FILE
+    ));
+
     @Override
     public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
         FlowFile flowFile = session.get();
         if (flowFile == null) {
             return;
         }
-
-        final long startNanos = System.nanoTime();
         try {
-            final String fileSystem = evaluateFileSystemProperty(context, flowFile);
-            final String directory = evaluateDirectoryProperty(context, flowFile);
-            final String fileName = evaluateFileNameProperty(context, flowFile);
-
+            final boolean isFile = context.getProperty(FILESYSTEM_OBJECT_TYPE).getValue().equals(FS_TYPE_FILE.getValue());
             final DataLakeServiceClient storageClient = getStorageClient(context, flowFile);
+
+            final String fileSystem = evaluateFileSystemProperty(context, flowFile);
             final DataLakeFileSystemClient fileSystemClient = storageClient.getFileSystemClient(fileSystem);
-            final DataLakeDirectoryClient directoryClient = fileSystemClient.getDirectoryClient(directory);
-            final DataLakeFileClient fileClient = directoryClient.getFileClient(fileName);
 
-            fileClient.delete();
-            session.transfer(flowFile, REL_SUCCESS);
+            final String directory = evaluateDirectoryProperty(context, flowFile);
+            final DataLakeDirectoryClient directoryClient = fileSystemClient.getDirectoryClient(directory);
 
-            final long transferMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
-            session.getProvenanceReporter().invokeRemoteProcess(flowFile, fileClient.getFileUrl(), "File deleted");
+            if (isFile) {
+                final String fileName = evaluateFileNameProperty(context, flowFile);
+                final DataLakeFileClient fileClient = directoryClient.getFileClient(fileName);
+                fileClient.delete();
+                session.transfer(flowFile, REL_SUCCESS);
+                session.getProvenanceReporter().invokeRemoteProcess(flowFile, fileClient.getFileUrl(), "File deleted");
+            } else {
+                directoryClient.deleteWithResponse(true, new DataLakeRequestConditions(), Duration.ofSeconds(10), Context.NONE);
+                session.transfer(flowFile, REL_SUCCESS);
+                session.getProvenanceReporter().invokeRemoteProcess(flowFile, directoryClient.getDirectoryUrl(), "Directory deleted");
+            }
         } catch (Exception e) {
             getLogger().error("Failed to delete the specified file from Azure Data Lake Storage", e);
             flowFile = session.penalize(flowFile);
             session.transfer(flowFile, REL_FAILURE);
         }
     }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTIES;
+    }
 }
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITDeleteAzureDataLakeStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITDeleteAzureDataLakeStorage.java
index a07108b..3b03237 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITDeleteAzureDataLakeStorage.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITDeleteAzureDataLakeStorage.java
@@ -29,6 +29,8 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 
+import static org.apache.nifi.processors.azure.storage.DeleteAzureDataLakeStorage.FS_TYPE_DIRECTORY;
+import static org.apache.nifi.processors.azure.storage.DeleteAzureDataLakeStorage.FS_TYPE_FILE;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
@@ -41,9 +43,73 @@ public class ITDeleteAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT
     }
 
     @Test
+    public void testDeleteDirectoryWithFiles() {
+        // GIVEN
+        String directory = "TestDirectory";
+        String filename = "testFile.txt";
+        String fileContent = "AzureFileContent";
+        String inputFlowFileContent = "InputFlowFileContent";
+
+        createDirectoryAndUploadFile(directory, filename, fileContent);
+
+        // WHEN
+        // THEN
+        testSuccessfulDelete(fileSystemName, directory, null, inputFlowFileContent, inputFlowFileContent);
+    }
+
+    @Test
+    public void testDeleteEmptyDirectoryWithFSTypeDirectory() {
+        // GIVEN
+        String directory = "TestDirectory";
+        String inputFlowFileContent = "InputFlowFileContent";
+
+        createDirectory(directory);
+
+        // WHEN
+        // THEN
+        testSuccessfulDelete(fileSystemName, directory, null, inputFlowFileContent, inputFlowFileContent);
+    }
+
+    @Test
+    public void testDeleteSubdirectory() {
+        // GIVEN
+        String parentDirectory = "TestParentDirectory";
+        String childDirectory = "TestParentDirectory/TestChildDirectory";
+        String filename = "testFile.txt";
+        String fileContent = "AzureFileContent";
+        String inputFlowFileContent = "InputFlowFileContent";
+
+        createDirectory(parentDirectory);
+        createDirectoryAndUploadFile(childDirectory, filename, fileContent);
+
+        // WHEN
+        // THEN
+        testSuccessfulDelete(fileSystemName, childDirectory, null, inputFlowFileContent, inputFlowFileContent);
+        assertTrue(directoryExists(parentDirectory));
+    }
+
+    @Test
+    public void testDeleteParentDirectory() {
+        // GIVEN
+        String parentDirectory = "TestParentDirectory";
+        String childDirectory = "TestParentDirectory/TestChildDirectory";
+        String filename = "testFile.txt";
+        String fileContent = "AzureFileContent";
+        String inputFlowFileContent = "InputFlowFileContent";
+
+        createDirectory(parentDirectory);
+        createDirectoryAndUploadFile(childDirectory, filename, fileContent);
+
+        // WHEN
+        // THEN
+        testSuccessfulDelete(fileSystemName, parentDirectory, null, inputFlowFileContent, inputFlowFileContent);
+        assertFalse(directoryExists(childDirectory));
+    }
+
+    @Test
     public void testDeleteFileFromRoot() {
         // GIVEN
-        String directory= "";
+        String directory = "";
         String filename = "testFile.txt";
         String fileContent = "AzureFileContent";
         String inputFlowFileContent = "InputFlowFileContent";
@@ -73,7 +139,7 @@ public class ITDeleteAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT
     @Test
     public void testDeleteFileFromDeepDirectory() {
         // GIVEN
-        String directory= "Directory01/Directory02/Directory03/Directory04/Directory05/Directory06/Directory07/"
+        String directory = "Directory01/Directory02/Directory03/Directory04/Directory05/Directory06/Directory07/"
                 + "Directory08/Directory09/Directory10/Directory11/Directory12/Directory13/Directory14/Directory15/"
                 + "Directory16/Directory17/Directory18/Directory19/Directory20/TestDirectory";
         String filename = "testFile.txt";
@@ -90,7 +156,7 @@ public class ITDeleteAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT
     @Test
     public void testDeleteFileWithWhitespaceInFilename() {
         // GIVEN
-        String directory= "TestDirectory";
+        String directory = "TestDirectory";
         String filename = "A test file.txt";
         String fileContent = "AzureFileContent";
         String inputFlowFileContent = "InputFlowFileContent";
@@ -105,7 +171,7 @@ public class ITDeleteAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT
     @Test
     public void testDeleteFileWithWhitespaceInDirectoryName() {
         // GIVEN
-        String directory= "A Test Directory";
+        String directory = "A Test Directory";
         String filename = "testFile.txt";
         String fileContent = "AzureFileContent";
         String inputFlowFileContent = "InputFlowFileContent";
@@ -118,7 +184,7 @@ public class ITDeleteAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT
     }
 
     @Test
-    public void testDeleteEmptyDirectory() {
+    public void testDeleteEmptyDirectoryWithFSTypeFile() {
         // GIVEN
         String parentDirectory = "ParentDirectory";
         String childDirectory = "ChildDirectory";
@@ -282,7 +348,7 @@ public class ITDeleteAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT
     }
 
     @Test
-    public void testDeleteNonEmptyDirectory() {
+    public void testDeleteNonEmptyDirectoryWithFSTypeFile() {
         // GIVEN
         String directory = "TestDirectory";
         String filename = "testFile.txt";
@@ -314,7 +380,7 @@ public class ITDeleteAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT
 
     private void testSuccessfulDelete(String fileSystem, String directory, String filename, String inputFlowFileContent, String expectedFlowFileContent) {
         testSuccessfulDeleteWithExpressionLanguage(fileSystem, directory, filename, Collections.emptyMap(), inputFlowFileContent, expectedFlowFileContent,
-                                                   directory, filename);
+                directory, filename);
     }
 
     private void testSuccessfulDeleteWithExpressionLanguage(String expLangFileSystem, String expLangDirectory, String expLangFilename, Map<String, String> attributes,
@@ -340,7 +406,7 @@ public class ITDeleteAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT
         startRunner(inputFlowFileContent, Collections.emptyMap());
 
         // THEN
-        DataLakeStorageException e = (DataLakeStorageException)runner.getLogger().getErrorMessages().get(0).getThrowable();
+        DataLakeStorageException e = (DataLakeStorageException) runner.getLogger().getErrorMessages().get(0).getThrowable();
         assertEquals(expectedErrorCode, e.getStatusCode());
 
         assertFailure(expectedFlowFileContent);
@@ -368,10 +434,19 @@ public class ITDeleteAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT
         return fileClient.exists();
     }
 
+    private boolean directoryExists(String directory) {
+        DataLakeDirectoryClient directoryClient = fileSystemClient.getDirectoryClient(directory);
+
+        return directoryClient.exists();
+    }
+
     private void setRunnerProperties(String fileSystem, String directory, String filename) {
+        runner.setProperty(DeleteAzureDataLakeStorage.FILESYSTEM_OBJECT_TYPE, filename != null ? FS_TYPE_FILE.getValue() : FS_TYPE_DIRECTORY.getValue());
         runner.setProperty(DeleteAzureDataLakeStorage.FILESYSTEM, fileSystem);
         runner.setProperty(DeleteAzureDataLakeStorage.DIRECTORY, directory);
-        runner.setProperty(DeleteAzureDataLakeStorage.FILE, filename);
+        if (filename != null) {
+            runner.setProperty(DeleteAzureDataLakeStorage.FILE, filename);
+        }
         runner.assertValid();
     }
 
@@ -391,7 +466,11 @@ public class ITDeleteAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT
         ProvenanceEventType actualEventType = runner.getProvenanceEvents().get(0).getEventType();
         assertEquals(expectedEventType, actualEventType);
 
-        assertFalse(fileExists(directory, filename));
+        if (filename != null) {
+            assertFalse(fileExists(directory, filename));
+        } else {
+            assertFalse(directoryExists(directory));
+        }
     }
 
     private void assertFailure(String expectedFlowFileContent) {