You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by tu...@apache.org on 2021/08/05 11:23:10 UTC
[nifi] branch main updated: NIFI-7947: Add directory deletion
functionality in DeleteAzureDataLakeStorage
This is an automated email from the ASF dual-hosted git repository.
turcsanyi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 9bf3b55 NIFI-7947: Add directory deletion functionality in DeleteAzureDataLakeStorage
9bf3b55 is described below
commit 9bf3b5503434e8a1dad5453b54a38dfaf079467c
Author: Lehel Boér <Le...@hotmail.com>
AuthorDate: Thu Jun 24 14:00:23 2021 +0200
NIFI-7947: Add directory deletion functionality in DeleteAzureDataLakeStorage
This closes #5190.
Signed-off-by: Peter Turcsanyi <tu...@apache.org>
---
.../AbstractAzureDataLakeStorageProcessor.java | 17 ++--
.../azure/storage/DeleteAzureDataLakeStorage.java | 76 ++++++++++++++---
.../storage/ITDeleteAzureDataLakeStorage.java | 99 +++++++++++++++++++---
3 files changed, 160 insertions(+), 32 deletions(-)
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureDataLakeStorageProcessor.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureDataLakeStorageProcessor.java
index e6ca356..fbbcf25 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureDataLakeStorageProcessor.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureDataLakeStorageProcessor.java
@@ -16,13 +16,6 @@
*/
package org.apache.nifi.processors.azure;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
import com.azure.core.credential.AccessToken;
import com.azure.core.credential.TokenCredential;
import com.azure.identity.ClientSecretCredential;
@@ -32,7 +25,6 @@ import com.azure.identity.ManagedIdentityCredentialBuilder;
import com.azure.storage.common.StorageSharedKeyCredential;
import com.azure.storage.file.datalake.DataLakeServiceClient;
import com.azure.storage.file.datalake.DataLakeServiceClientBuilder;
-
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
@@ -50,6 +42,13 @@ import org.apache.nifi.services.azure.storage.ADLSCredentialsDetails;
import org.apache.nifi.services.azure.storage.ADLSCredentialsService;
import reactor.core.publisher.Mono;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_FILENAME;
public abstract class AbstractAzureDataLakeStorageProcessor extends AbstractProcessor {
@@ -134,7 +133,7 @@ public abstract class AbstractAzureDataLakeStorageProcessor extends AbstractProc
final String servicePrincipalClientId = credentialsDetails.getServicePrincipalClientId();
final String servicePrincipalClientSecret = credentialsDetails.getServicePrincipalClientSecret();
- final String endpoint = String.format("https://%s.%s", accountName,endpointSuffix);
+ final String endpoint = String.format("https://%s.%s", accountName, endpointSuffix);
final DataLakeServiceClient storageClient;
if (StringUtils.isNotBlank(accountKey)) {
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/DeleteAzureDataLakeStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/DeleteAzureDataLakeStorage.java
index 0e5128e..b929971 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/DeleteAzureDataLakeStorage.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/DeleteAzureDataLakeStorage.java
@@ -16,22 +16,33 @@
*/
package org.apache.nifi.processors.azure.storage;
+import com.azure.core.util.Context;
import com.azure.storage.file.datalake.DataLakeDirectoryClient;
import com.azure.storage.file.datalake.DataLakeFileClient;
import com.azure.storage.file.datalake.DataLakeFileSystemClient;
import com.azure.storage.file.datalake.DataLakeServiceClient;
+import com.azure.storage.file.datalake.models.DataLakeRequestConditions;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor;
-import java.util.concurrent.TimeUnit;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_FILENAME;
@Tags({"azure", "microsoft", "cloud", "storage", "adlsgen2", "datalake"})
@SeeAlso({PutAzureDataLakeStorage.class, FetchAzureDataLakeStorage.class, ListAzureDataLakeStorage.class})
@@ -39,33 +50,72 @@ import java.util.concurrent.TimeUnit;
@InputRequirement(Requirement.INPUT_REQUIRED)
public class DeleteAzureDataLakeStorage extends AbstractAzureDataLakeStorageProcessor {
+ public static final AllowableValue FS_TYPE_FILE = new AllowableValue("file", "File", "The object to be deleted is a file.");
+ public static final AllowableValue FS_TYPE_DIRECTORY = new AllowableValue("directory", "Directory", "The object to be deleted is a directory.");
+
+ public static final PropertyDescriptor FILESYSTEM_OBJECT_TYPE = new PropertyDescriptor.Builder()
+ .name("filesystem-object-type")
+ .displayName("Filesystem Object Type")
+ .description("They type of the file system object to be deleted. It can be either folder or file.")
+ .allowableValues(FS_TYPE_FILE, FS_TYPE_DIRECTORY)
+ .required(true)
+ .defaultValue(FS_TYPE_FILE.toString())
+ .build();
+
+ public static final PropertyDescriptor FILE = new PropertyDescriptor.Builder()
+ .name("file-name").displayName("File Name")
+ .description("The filename")
+ .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .required(true)
+ .defaultValue(String.format("${%s}", ATTR_NAME_FILENAME))
+ .dependsOn(FILESYSTEM_OBJECT_TYPE, FS_TYPE_FILE)
+ .build();
+
+ private static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList(
+ ADLS_CREDENTIALS_SERVICE,
+ FILESYSTEM,
+ FILESYSTEM_OBJECT_TYPE,
+ DIRECTORY,
+ FILE
+ ));
+
@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
FlowFile flowFile = session.get();
if (flowFile == null) {
return;
}
-
- final long startNanos = System.nanoTime();
try {
- final String fileSystem = evaluateFileSystemProperty(context, flowFile);
- final String directory = evaluateDirectoryProperty(context, flowFile);
- final String fileName = evaluateFileNameProperty(context, flowFile);
-
+ final boolean isFile = context.getProperty(FILESYSTEM_OBJECT_TYPE).getValue().equals(FS_TYPE_FILE.getValue());
final DataLakeServiceClient storageClient = getStorageClient(context, flowFile);
+
+ final String fileSystem = evaluateFileSystemProperty(context, flowFile);
final DataLakeFileSystemClient fileSystemClient = storageClient.getFileSystemClient(fileSystem);
- final DataLakeDirectoryClient directoryClient = fileSystemClient.getDirectoryClient(directory);
- final DataLakeFileClient fileClient = directoryClient.getFileClient(fileName);
- fileClient.delete();
- session.transfer(flowFile, REL_SUCCESS);
+ final String directory = evaluateDirectoryProperty(context, flowFile);
+ final DataLakeDirectoryClient directoryClient = fileSystemClient.getDirectoryClient(directory);
- final long transferMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
- session.getProvenanceReporter().invokeRemoteProcess(flowFile, fileClient.getFileUrl(), "File deleted");
+ if (isFile) {
+ final String fileName = evaluateFileNameProperty(context, flowFile);
+ final DataLakeFileClient fileClient = directoryClient.getFileClient(fileName);
+ fileClient.delete();
+ session.transfer(flowFile, REL_SUCCESS);
+ session.getProvenanceReporter().invokeRemoteProcess(flowFile, fileClient.getFileUrl(), "File deleted");
+ } else {
+ directoryClient.deleteWithResponse(true, new DataLakeRequestConditions(), Duration.ofSeconds(10), Context.NONE);
+ session.transfer(flowFile, REL_SUCCESS);
+ session.getProvenanceReporter().invokeRemoteProcess(flowFile, directoryClient.getDirectoryUrl(), "Directory deleted");
+ }
} catch (Exception e) {
getLogger().error("Failed to delete the specified file from Azure Data Lake Storage", e);
flowFile = session.penalize(flowFile);
session.transfer(flowFile, REL_FAILURE);
}
}
+
+ @Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return PROPERTIES;
+ }
}
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITDeleteAzureDataLakeStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITDeleteAzureDataLakeStorage.java
index a07108b..3b03237 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITDeleteAzureDataLakeStorage.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITDeleteAzureDataLakeStorage.java
@@ -29,6 +29,8 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
+import static org.apache.nifi.processors.azure.storage.DeleteAzureDataLakeStorage.FS_TYPE_DIRECTORY;
+import static org.apache.nifi.processors.azure.storage.DeleteAzureDataLakeStorage.FS_TYPE_FILE;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
@@ -41,9 +43,73 @@ public class ITDeleteAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT
}
@Test
+ public void testDeleteDirectoryWithFiles() {
+ // GIVEN
+ String directory = "TestDirectory";
+ String filename = "testFile.txt";
+ String fileContent = "AzureFileContent";
+ String inputFlowFileContent = "InputFlowFileContent";
+
+ createDirectoryAndUploadFile(directory, filename, fileContent);
+
+ // WHEN
+ // THEN
+ testSuccessfulDelete(fileSystemName, directory, null, inputFlowFileContent, inputFlowFileContent);
+ }
+
+ @Test
+ public void testDeleteEmptyDirectoryWithFSTypeDirectory() {
+ // GIVEN
+ String directory = "TestDirectory";
+ String inputFlowFileContent = "InputFlowFileContent";
+
+ createDirectory(directory);
+
+ // WHEN
+ // THEN
+ testSuccessfulDelete(fileSystemName, directory, null, inputFlowFileContent, inputFlowFileContent);
+ }
+
+ @Test
+ public void testDeleteSubdirectory() {
+ // GIVEN
+ String parentDirectory = "TestParentDirectory";
+ String childDirectory = "TestParentDirectory/TestChildDirectory";
+ String filename = "testFile.txt";
+ String fileContent = "AzureFileContent";
+ String inputFlowFileContent = "InputFlowFileContent";
+
+ createDirectory(parentDirectory);
+ createDirectoryAndUploadFile(childDirectory, filename, fileContent);
+
+ // WHEN
+ // THEN
+ testSuccessfulDelete(fileSystemName, childDirectory, null, inputFlowFileContent, inputFlowFileContent);
+ assertTrue(directoryExists(parentDirectory));
+ }
+
+ @Test
+ public void testDeleteParentDirectory() {
+ // GIVEN
+ String parentDirectory = "TestParentDirectory";
+ String childDirectory = "TestParentDirectory/TestChildDirectory";
+ String filename = "testFile.txt";
+ String fileContent = "AzureFileContent";
+ String inputFlowFileContent = "InputFlowFileContent";
+
+ createDirectory(parentDirectory);
+ createDirectoryAndUploadFile(childDirectory, filename, fileContent);
+
+ // WHEN
+ // THEN
+ testSuccessfulDelete(fileSystemName, parentDirectory, null, inputFlowFileContent, inputFlowFileContent);
+ assertFalse(directoryExists(childDirectory));
+ }
+
+ @Test
public void testDeleteFileFromRoot() {
// GIVEN
- String directory= "";
+ String directory = "";
String filename = "testFile.txt";
String fileContent = "AzureFileContent";
String inputFlowFileContent = "InputFlowFileContent";
@@ -73,7 +139,7 @@ public class ITDeleteAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT
@Test
public void testDeleteFileFromDeepDirectory() {
// GIVEN
- String directory= "Directory01/Directory02/Directory03/Directory04/Directory05/Directory06/Directory07/"
+ String directory = "Directory01/Directory02/Directory03/Directory04/Directory05/Directory06/Directory07/"
+ "Directory08/Directory09/Directory10/Directory11/Directory12/Directory13/Directory14/Directory15/"
+ "Directory16/Directory17/Directory18/Directory19/Directory20/TestDirectory";
String filename = "testFile.txt";
@@ -90,7 +156,7 @@ public class ITDeleteAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT
@Test
public void testDeleteFileWithWhitespaceInFilename() {
// GIVEN
- String directory= "TestDirectory";
+ String directory = "TestDirectory";
String filename = "A test file.txt";
String fileContent = "AzureFileContent";
String inputFlowFileContent = "InputFlowFileContent";
@@ -105,7 +171,7 @@ public class ITDeleteAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT
@Test
public void testDeleteFileWithWhitespaceInDirectoryName() {
// GIVEN
- String directory= "A Test Directory";
+ String directory = "A Test Directory";
String filename = "testFile.txt";
String fileContent = "AzureFileContent";
String inputFlowFileContent = "InputFlowFileContent";
@@ -118,7 +184,7 @@ public class ITDeleteAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT
}
@Test
- public void testDeleteEmptyDirectory() {
+ public void testDeleteEmptyDirectoryWithFSTypeFile() {
// GIVEN
String parentDirectory = "ParentDirectory";
String childDirectory = "ChildDirectory";
@@ -282,7 +348,7 @@ public class ITDeleteAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT
}
@Test
- public void testDeleteNonEmptyDirectory() {
+ public void testDeleteNonEmptyDirectoryWithFSTypeFile() {
// GIVEN
String directory = "TestDirectory";
String filename = "testFile.txt";
@@ -314,7 +380,7 @@ public class ITDeleteAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT
private void testSuccessfulDelete(String fileSystem, String directory, String filename, String inputFlowFileContent, String expectedFlowFileContent) {
testSuccessfulDeleteWithExpressionLanguage(fileSystem, directory, filename, Collections.emptyMap(), inputFlowFileContent, expectedFlowFileContent,
- directory, filename);
+ directory, filename);
}
private void testSuccessfulDeleteWithExpressionLanguage(String expLangFileSystem, String expLangDirectory, String expLangFilename, Map<String, String> attributes,
@@ -340,7 +406,7 @@ public class ITDeleteAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT
startRunner(inputFlowFileContent, Collections.emptyMap());
// THEN
- DataLakeStorageException e = (DataLakeStorageException)runner.getLogger().getErrorMessages().get(0).getThrowable();
+ DataLakeStorageException e = (DataLakeStorageException) runner.getLogger().getErrorMessages().get(0).getThrowable();
assertEquals(expectedErrorCode, e.getStatusCode());
assertFailure(expectedFlowFileContent);
@@ -368,10 +434,19 @@ public class ITDeleteAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT
return fileClient.exists();
}
+ private boolean directoryExists(String directory) {
+ DataLakeDirectoryClient directoryClient = fileSystemClient.getDirectoryClient(directory);
+
+ return directoryClient.exists();
+ }
+
private void setRunnerProperties(String fileSystem, String directory, String filename) {
+ runner.setProperty(DeleteAzureDataLakeStorage.FILESYSTEM_OBJECT_TYPE, filename != null ? FS_TYPE_FILE.getValue() : FS_TYPE_DIRECTORY.getValue());
runner.setProperty(DeleteAzureDataLakeStorage.FILESYSTEM, fileSystem);
runner.setProperty(DeleteAzureDataLakeStorage.DIRECTORY, directory);
- runner.setProperty(DeleteAzureDataLakeStorage.FILE, filename);
+ if (filename != null) {
+ runner.setProperty(DeleteAzureDataLakeStorage.FILE, filename);
+ }
runner.assertValid();
}
@@ -391,7 +466,11 @@ public class ITDeleteAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT
ProvenanceEventType actualEventType = runner.getProvenanceEvents().get(0).getEventType();
assertEquals(expectedEventType, actualEventType);
- assertFalse(fileExists(directory, filename));
+ if (filename != null) {
+ assertFalse(fileExists(directory, filename));
+ } else {
+ assertFalse(directoryExists(directory));
+ }
}
private void assertFailure(String expectedFlowFileContent) {