You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by tu...@apache.org on 2022/02/23 15:10:37 UTC
[nifi] branch main updated: NIFI-9657 Create MoveADLS processor
This is an automated email from the ASF dual-hosted git repository.
turcsanyi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new a3e1f32 NIFI-9657 Create MoveADLS processor
a3e1f32 is described below
commit a3e1f32cae964df9dee47600016db9d2baba5d34
Author: Timea Barna <ti...@gmail.com>
AuthorDate: Tue Feb 8 10:00:40 2022 +0100
NIFI-9657 Create MoveADLS processor
This closes #5752.
Signed-off-by: Peter Turcsanyi <tu...@apache.org>
---
.../AbstractAzureDataLakeStorageProcessor.java | 39 ++-
.../azure/storage/MoveAzureDataLakeStorage.java | 221 ++++++++++++
.../azure/storage/utils/ADLSAttributes.java | 6 +
.../services/org.apache.nifi.processor.Processor | 1 +
.../storage/AbstractAzureDataLakeStorageIT.java | 18 +-
.../azure/storage/ITMoveAzureDataLakeStorage.java | 370 +++++++++++++++++++++
6 files changed, 639 insertions(+), 16 deletions(-)
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureDataLakeStorageProcessor.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureDataLakeStorageProcessor.java
index fbbcf25..d371e12 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureDataLakeStorageProcessor.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureDataLakeStorageProcessor.java
@@ -175,19 +175,27 @@ public abstract class AbstractAzureDataLakeStorageProcessor extends AbstractProc
}
public static String evaluateFileSystemProperty(ProcessContext context, FlowFile flowFile) {
- String fileSystem = context.getProperty(FILESYSTEM).evaluateAttributeExpressions(flowFile).getValue();
+ return evaluateFileSystemProperty(context, flowFile, FILESYSTEM);
+ }
+
+ public static String evaluateFileSystemProperty(ProcessContext context, FlowFile flowFile, PropertyDescriptor property) {
+ String fileSystem = context.getProperty(property).evaluateAttributeExpressions(flowFile).getValue();
if (StringUtils.isBlank(fileSystem)) {
- throw new ProcessException(String.format("'%1$s' property evaluated to blank string. '%s' must be specified as a non-blank string.", FILESYSTEM.getDisplayName()));
+ throw new ProcessException(String.format("'%1$s' property evaluated to blank string. '%s' must be specified as a non-blank string.", property.getDisplayName()));
}
return fileSystem;
}
public static String evaluateDirectoryProperty(ProcessContext context, FlowFile flowFile) {
- String directory = context.getProperty(DIRECTORY).evaluateAttributeExpressions(flowFile).getValue();
+ return evaluateDirectoryProperty(context, flowFile, DIRECTORY);
+ }
+
+ public static String evaluateDirectoryProperty(ProcessContext context, FlowFile flowFile, PropertyDescriptor property) {
+ String directory = context.getProperty(property).evaluateAttributeExpressions(flowFile).getValue();
if (directory.startsWith("/")) {
- throw new ProcessException(String.format("'%1$s' starts with '/'. '%s' cannot contain a leading '/'.", DIRECTORY.getDisplayName()));
+ throw new ProcessException(String.format("'%1$s' starts with '/'. '%s' cannot contain a leading '/'.", property.getDisplayName()));
} else if (StringUtils.isNotEmpty(directory) && StringUtils.isWhitespace(directory)) {
- throw new ProcessException(String.format("'%1$s' contains whitespace characters only.", DIRECTORY.getDisplayName()));
+ throw new ProcessException(String.format("'%1$s' contains whitespace characters only.", property.getDisplayName()));
}
return directory;
}
@@ -200,19 +208,30 @@ public abstract class AbstractAzureDataLakeStorageProcessor extends AbstractProc
return fileName;
}
- private static class DirectoryValidator implements Validator {
- @Override
+ public static class DirectoryValidator implements Validator {
+ private String displayName;
+
+ public DirectoryValidator() {
+ this.displayName = null;
+ }
+
+ public DirectoryValidator(String displayName) {
+ this.displayName = displayName;
+ }
+
+ @Override
public ValidationResult validate(String subject, String input, ValidationContext context) {
+ displayName = displayName == null ? DIRECTORY.getDisplayName() : displayName;
ValidationResult.Builder builder = new ValidationResult.Builder()
- .subject(DIRECTORY.getDisplayName())
+ .subject(displayName)
.input(input);
if (context.isExpressionLanguagePresent(input)) {
builder.valid(true).explanation("Expression Language Present");
} else if (input.startsWith("/")) {
- builder.valid(false).explanation(String.format("'%s' cannot contain a leading '/'", DIRECTORY.getDisplayName()));
+ builder.valid(false).explanation(String.format("'%s' cannot contain a leading '/'", displayName));
} else if (StringUtils.isNotEmpty(input) && StringUtils.isWhitespace(input)) {
- builder.valid(false).explanation(String.format("'%s' cannot contain whitespace characters only", DIRECTORY.getDisplayName()));
+ builder.valid(false).explanation(String.format("'%s' cannot contain whitespace characters only", displayName));
} else {
builder.valid(true);
}
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/MoveAzureDataLakeStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/MoveAzureDataLakeStorage.java
new file mode 100644
index 0000000..cb3d8b6
--- /dev/null
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/MoveAzureDataLakeStorage.java
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure.storage;
+
+import com.azure.storage.file.datalake.DataLakeDirectoryClient;
+import com.azure.storage.file.datalake.DataLakeFileClient;
+import com.azure.storage.file.datalake.DataLakeFileSystemClient;
+import com.azure.storage.file.datalake.DataLakeServiceClient;
+import com.azure.storage.file.datalake.models.DataLakeRequestConditions;
+import com.azure.storage.file.datalake.models.DataLakeStorageException;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_DESCRIPTION_DIRECTORY;
+import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_DESCRIPTION_FILENAME;
+import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_DESCRIPTION_FILESYSTEM;
+import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_DESCRIPTION_LENGTH;
+import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_DESCRIPTION_PRIMARY_URI;
+import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_DESCRIPTION_SOURCE_DIRECTORY;
+import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_DESCRIPTION_SOURCE_FILESYSTEM;
+import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_DIRECTORY;
+import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_FILENAME;
+import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_FILESYSTEM;
+import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_LENGTH;
+import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_PRIMARY_URI;
+import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_SOURCE_DIRECTORY;
+import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_SOURCE_FILESYSTEM;
+
+@Tags({"azure", "microsoft", "cloud", "storage", "adlsgen2", "datalake"})
+@SeeAlso({DeleteAzureDataLakeStorage.class, FetchAzureDataLakeStorage.class, ListAzureDataLakeStorage.class})
+@CapabilityDescription("Moves content within an Azure Data Lake Storage Gen 2." +
+ " After the move, files will be no longer available on source location.")
+@WritesAttributes({@WritesAttribute(attribute = ATTR_NAME_SOURCE_FILESYSTEM, description = ATTR_DESCRIPTION_SOURCE_FILESYSTEM),
+ @WritesAttribute(attribute = ATTR_NAME_SOURCE_DIRECTORY, description = ATTR_DESCRIPTION_SOURCE_DIRECTORY),
+ @WritesAttribute(attribute = ATTR_NAME_FILESYSTEM, description = ATTR_DESCRIPTION_FILESYSTEM),
+ @WritesAttribute(attribute = ATTR_NAME_DIRECTORY, description = ATTR_DESCRIPTION_DIRECTORY),
+ @WritesAttribute(attribute = ATTR_NAME_FILENAME, description = ATTR_DESCRIPTION_FILENAME),
+ @WritesAttribute(attribute = ATTR_NAME_PRIMARY_URI, description = ATTR_DESCRIPTION_PRIMARY_URI),
+ @WritesAttribute(attribute = ATTR_NAME_LENGTH, description = ATTR_DESCRIPTION_LENGTH)})
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+public class MoveAzureDataLakeStorage extends AbstractAzureDataLakeStorageProcessor {
+
+ public static final String FAIL_RESOLUTION = "fail";
+ public static final String REPLACE_RESOLUTION = "replace";
+ public static final String IGNORE_RESOLUTION = "ignore";
+
+
+ public static final PropertyDescriptor CONFLICT_RESOLUTION = new PropertyDescriptor.Builder()
+ .name("conflict-resolution-strategy")
+ .displayName("Conflict Resolution Strategy")
+ .description("Indicates what should happen when a file with the same name already exists in the output directory")
+ .required(true)
+ .defaultValue(FAIL_RESOLUTION)
+ .allowableValues(FAIL_RESOLUTION, REPLACE_RESOLUTION, IGNORE_RESOLUTION)
+ .build();
+
+ public static final PropertyDescriptor SOURCE_FILESYSTEM = new PropertyDescriptor.Builder()
+ .name("source-filesystem-name")
+ .displayName("Source Filesystem")
+ .description("Name of the Azure Storage File System from where the move should happen.")
+ .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .required(true)
+ .defaultValue(String.format("${%s}", ATTR_NAME_FILESYSTEM))
+ .build();
+
+ public static final PropertyDescriptor SOURCE_DIRECTORY = new PropertyDescriptor.Builder()
+ .name("source-directory-name")
+ .displayName("Source Directory")
+ .description("Name of the Azure Storage Directory from where the move should happen. The Directory Name cannot contain a leading '/'." +
+ " The root directory can be designated by the empty string value.")
+ .addValidator(new DirectoryValidator("Source Directory"))
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .required(true)
+ .defaultValue(String.format("${%s}", ATTR_NAME_DIRECTORY))
+ .build();
+
+ public static final PropertyDescriptor DESTINATION_FILESYSTEM = new PropertyDescriptor.Builder()
+ .fromPropertyDescriptor(FILESYSTEM)
+ .displayName("Destination Filesystem")
+ .description("Name of the Azure Storage File System where the files will be moved.")
+ .build();
+
+ public static final PropertyDescriptor DESTINATION_DIRECTORY = new PropertyDescriptor.Builder()
+ .fromPropertyDescriptor(DIRECTORY)
+ .displayName("Destination Directory")
+ .description("Name of the Azure Storage Directory where the files will be moved. The Directory Name cannot contain a leading '/'." +
+ " The root directory can be designated by the empty string value. Non-existing directories will be created." +
+ " If the original directory structure should be kept, the full directory path needs to be provided after the destination directory." +
+ " e.g.: destdir/${azure.directory}")
+ .addValidator(new DirectoryValidator("Destination Directory"))
+ .build();
+
+ private static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList(
+ ADLS_CREDENTIALS_SERVICE,
+ SOURCE_FILESYSTEM,
+ SOURCE_DIRECTORY,
+ DESTINATION_FILESYSTEM,
+ DESTINATION_DIRECTORY,
+ FILE,
+ CONFLICT_RESOLUTION
+ ));
+
+ @Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return PROPERTIES;
+ }
+
+ @Override
+ public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
+ FlowFile flowFile = session.get();
+ if (flowFile == null) {
+ return;
+ }
+
+ final long startNanos = System.nanoTime();
+ try {
+ final String sourceFileSystem = evaluateFileSystemProperty(context, flowFile, SOURCE_FILESYSTEM);
+ final String sourceDirectory = evaluateDirectoryProperty(context, flowFile, SOURCE_DIRECTORY);
+ final String destinationFileSystem = evaluateFileSystemProperty(context, flowFile, DESTINATION_FILESYSTEM);
+ final String destinationDirectory = evaluateDirectoryProperty(context, flowFile, DESTINATION_DIRECTORY);
+ final String fileName = evaluateFileNameProperty(context, flowFile);
+
+ final String destinationPath;
+ if (!destinationDirectory.isEmpty() && !sourceDirectory.isEmpty()) {
+ destinationPath = destinationDirectory + "/";
+ } else {
+ destinationPath = destinationDirectory;
+ }
+
+ final DataLakeServiceClient storageClient = getStorageClient(context, flowFile);
+ final DataLakeFileSystemClient sourceFileSystemClient = storageClient.getFileSystemClient(sourceFileSystem);
+ final DataLakeDirectoryClient sourceDirectoryClient = sourceFileSystemClient.getDirectoryClient(sourceDirectory);
+ final DataLakeFileSystemClient destinationFileSystemClient = storageClient.getFileSystemClient(destinationFileSystem);
+ final DataLakeDirectoryClient destinationDirectoryClient = destinationFileSystemClient.getDirectoryClient(destinationDirectory);
+ DataLakeFileClient sourceFileClient = sourceDirectoryClient.getFileClient(fileName);
+ final DataLakeRequestConditions sourceConditions = new DataLakeRequestConditions();
+ final DataLakeRequestConditions destinationConditions = new DataLakeRequestConditions();
+ final String conflictResolution = context.getProperty(CONFLICT_RESOLUTION).getValue();
+
+ try {
+ if (!destinationDirectory.isEmpty() && !destinationDirectoryClient.exists()) {
+ destinationDirectoryClient.create();
+ }
+
+ if (!conflictResolution.equals(REPLACE_RESOLUTION)) {
+ destinationConditions.setIfNoneMatch("*");
+ }
+
+ final DataLakeFileClient destinationFileClient = sourceFileClient.renameWithResponse(destinationFileSystem,
+ destinationPath + fileName,
+ sourceConditions,
+ destinationConditions,
+ null,
+ null)
+ .getValue();
+
+ final Map<String, String> attributes = new HashMap<>();
+ attributes.put(ATTR_NAME_SOURCE_FILESYSTEM, sourceFileSystem);
+ attributes.put(ATTR_NAME_SOURCE_DIRECTORY, sourceDirectory);
+ attributes.put(ATTR_NAME_FILESYSTEM, destinationFileSystem);
+ attributes.put(ATTR_NAME_DIRECTORY, destinationDirectory);
+ attributes.put(ATTR_NAME_FILENAME, fileName);
+ attributes.put(ATTR_NAME_PRIMARY_URI, destinationFileClient.getFileUrl());
+ attributes.put(ATTR_NAME_LENGTH, String.valueOf(destinationFileClient.getProperties().getFileSize()));
+ flowFile = session.putAllAttributes(flowFile, attributes);
+
+ session.transfer(flowFile, REL_SUCCESS);
+ final long transferMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
+ session.getProvenanceReporter().send(flowFile, sourceFileClient.getFileUrl(), transferMillis);
+ } catch (DataLakeStorageException dlsException) {
+ if (dlsException.getStatusCode() == 409 && conflictResolution.equals(IGNORE_RESOLUTION)) {
+ session.transfer(flowFile, REL_SUCCESS);
+ String warningMessage = String.format("File with the same name already exists. " +
+ "Remote file not modified. " +
+ "Transferring {} to success due to %s being set to '%s'.", CONFLICT_RESOLUTION.getDisplayName(), conflictResolution);
+ getLogger().warn(warningMessage, flowFile);
+ } else {
+ throw dlsException;
+ }
+ }
+ } catch (Exception e) {
+ getLogger().error("Failed to move file on Azure Data Lake Storage", e);
+ flowFile = session.penalize(flowFile);
+ session.transfer(flowFile, REL_FAILURE);
+ }
+ }
+}
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/ADLSAttributes.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/ADLSAttributes.java
index 087cbaa..99f7dfc 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/ADLSAttributes.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/ADLSAttributes.java
@@ -42,4 +42,10 @@ public final class ADLSAttributes {
public static final String ATTR_NAME_PRIMARY_URI = "azure.primaryUri";
public static final String ATTR_DESCRIPTION_PRIMARY_URI = "Primary location for file content";
+ public static final String ATTR_NAME_SOURCE_FILESYSTEM = "azure.source.filesystem";
+ public static final String ATTR_DESCRIPTION_SOURCE_FILESYSTEM = "The name of the source Azure File System";
+
+ public static final String ATTR_NAME_SOURCE_DIRECTORY = "azure.source.directory";
+ public static final String ATTR_DESCRIPTION_SOURCE_DIRECTORY = "The name of the source Azure Directory";
+
}
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index 24cadf6..ce7e074 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -30,3 +30,4 @@ org.apache.nifi.processors.azure.storage.ListAzureBlobStorage_v12
org.apache.nifi.processors.azure.storage.FetchAzureBlobStorage_v12
org.apache.nifi.processors.azure.storage.PutAzureBlobStorage_v12
org.apache.nifi.processors.azure.storage.DeleteAzureBlobStorage_v12
+org.apache.nifi.processors.azure.storage.MoveAzureDataLakeStorage
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AbstractAzureDataLakeStorageIT.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AbstractAzureDataLakeStorageIT.java
index 54f0932..383ab16 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AbstractAzureDataLakeStorageIT.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AbstractAzureDataLakeStorageIT.java
@@ -87,22 +87,28 @@ public abstract class AbstractAzureDataLakeStorageIT extends AbstractAzureStorag
}
protected void uploadFile(String directory, String filename, String fileContent) {
- byte[] fileContentBytes = fileContent.getBytes();
+ uploadFile(directory, filename, fileContent.getBytes());
+ }
+
+ protected void uploadFile(TestFile testFile) {
+ uploadFile(testFile.getDirectory(), testFile.getFilename(), testFile.getFileContent());
+ }
+ protected void uploadFile(String directory, String filename, byte[] fileData) {
DataLakeDirectoryClient directoryClient = fileSystemClient.getDirectoryClient(directory);
DataLakeFileClient fileClient = directoryClient.createFile(filename);
- PutAzureDataLakeStorage.uploadContent(fileClient, new ByteArrayInputStream(fileContentBytes), fileContentBytes.length);
+ PutAzureDataLakeStorage.uploadContent(fileClient, new ByteArrayInputStream(fileData), fileData.length);
}
- protected void uploadFile(TestFile testFile) {
- uploadFile(testFile.getDirectory(), testFile.getFilename(), testFile.getFileContent());
+ protected void createDirectoryAndUploadFile(String directory, String filename, String fileContent) {
+ createDirectoryAndUploadFile(directory, filename, fileContent.getBytes());
}
- protected void createDirectoryAndUploadFile(String directory, String filename, String fileContent) {
+ protected void createDirectoryAndUploadFile(String directory, String filename, byte[] fileData) {
createDirectory(directory);
- uploadFile(directory, filename, fileContent);
+ uploadFile(directory, filename, fileData);
}
protected void createDirectoryAndUploadFile(TestFile testFile) {
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITMoveAzureDataLakeStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITMoveAzureDataLakeStorage.java
new file mode 100644
index 0000000..d049d9d
--- /dev/null
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITMoveAzureDataLakeStorage.java
@@ -0,0 +1,370 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure.storage;
+
+import com.azure.storage.file.datalake.DataLakeDirectoryClient;
+import com.azure.storage.file.datalake.DataLakeFileClient;
+import com.google.common.collect.Sets;
+import com.google.common.net.UrlEscapers;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.processor.Processor;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.ProvenanceEventType;
+import org.apache.nifi.util.MockFlowFile;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_DIRECTORY;
+import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_FILENAME;
+import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_FILESYSTEM;
+import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_LENGTH;
+import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_PRIMARY_URI;
+import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_SOURCE_DIRECTORY;
+import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_SOURCE_FILESYSTEM;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class ITMoveAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT {
+
+ private static final String SOURCE_DIRECTORY = "sourceDir1";
+ private static final String DESTINATION_DIRECTORY = "destDir1";
+ private static final String FILE_NAME = "file1";
+ private static final byte[] FILE_DATA = "0123456789".getBytes();
+
+ private static final String EL_FILESYSTEM = "az.filesystem";
+ private static final String EL_DIRECTORY = "az.directory";
+ private static final String EL_FILE_NAME = "az.filename";
+
+ @Override
+ protected Class<? extends Processor> getProcessorClass() {
+ return MoveAzureDataLakeStorage.class;
+ }
+
+ @Before
+ public void setUp() throws InterruptedException {
+ runner.setProperty(MoveAzureDataLakeStorage.SOURCE_FILESYSTEM, fileSystemName);
+ runner.setProperty(MoveAzureDataLakeStorage.SOURCE_DIRECTORY, SOURCE_DIRECTORY);
+ runner.setProperty(MoveAzureDataLakeStorage.DESTINATION_FILESYSTEM, fileSystemName);
+ runner.setProperty(MoveAzureDataLakeStorage.DESTINATION_DIRECTORY, DESTINATION_DIRECTORY);
+ runner.setProperty(MoveAzureDataLakeStorage.FILE, FILE_NAME);
+ }
+
+ @Test
+ public void testMoveFileToExistingDirectory() throws Exception {
+ createDirectoryAndUploadFile(SOURCE_DIRECTORY, FILE_NAME, FILE_DATA);
+ createDirectory(DESTINATION_DIRECTORY);
+
+ runProcessor(FILE_DATA);
+
+ assertSuccess(SOURCE_DIRECTORY, DESTINATION_DIRECTORY, FILE_NAME, FILE_DATA);
+ }
+
+ @Test
+ public void testMoveFileToExistingDirectoryWithReplaceResolution() throws Exception {
+ createDirectoryAndUploadFile(SOURCE_DIRECTORY, FILE_NAME, FILE_DATA);
+ createDirectory(DESTINATION_DIRECTORY);
+
+ runner.setProperty(MoveAzureDataLakeStorage.CONFLICT_RESOLUTION, MoveAzureDataLakeStorage.REPLACE_RESOLUTION);
+
+ runProcessor(FILE_DATA);
+
+ assertSuccess(SOURCE_DIRECTORY, DESTINATION_DIRECTORY, FILE_NAME, FILE_DATA);
+ }
+
+ @Test
+ public void testMoveFileToExistingDirectoryWithIgnoreResolution() throws Exception {
+ createDirectoryAndUploadFile(SOURCE_DIRECTORY, FILE_NAME, FILE_DATA);
+ createDirectory(DESTINATION_DIRECTORY);
+
+ runner.setProperty(MoveAzureDataLakeStorage.CONFLICT_RESOLUTION, MoveAzureDataLakeStorage.IGNORE_RESOLUTION);
+
+ runProcessor(FILE_DATA);
+
+ assertSuccess(SOURCE_DIRECTORY, DESTINATION_DIRECTORY, FILE_NAME, FILE_DATA);
+ }
+
+ @Test
+ public void testMoveFileToNonExistingDirectory() throws Exception {
+ createDirectoryAndUploadFile(SOURCE_DIRECTORY, FILE_NAME, FILE_DATA);
+
+ runProcessor(FILE_DATA);
+
+ assertSuccess(SOURCE_DIRECTORY, DESTINATION_DIRECTORY, FILE_NAME, FILE_DATA);
+ }
+
+ @Test
+ public void testMoveFileToDeepDirectory() throws Exception {
+ String sourceDirectory = "dir1/dir2";
+ String destinationDirectory = sourceDirectory + "/dir3/dir4";
+ createDirectoryAndUploadFile(sourceDirectory, FILE_NAME, FILE_DATA);
+
+ runner.setProperty(MoveAzureDataLakeStorage.SOURCE_DIRECTORY, sourceDirectory);
+ runner.setProperty(MoveAzureDataLakeStorage.DESTINATION_DIRECTORY, destinationDirectory);
+
+ runProcessor(FILE_DATA);
+
+ assertSuccess(sourceDirectory, destinationDirectory, FILE_NAME, FILE_DATA);
+ }
+
+ @Test
+ public void testMoveFileToRootDirectory() throws Exception {
+ createDirectoryAndUploadFile(SOURCE_DIRECTORY, FILE_NAME, FILE_DATA);
+
+ String rootDirectory = "";
+ runner.setProperty(MoveAzureDataLakeStorage.DESTINATION_DIRECTORY, rootDirectory);
+
+ runProcessor(FILE_DATA);
+
+ assertSuccess(SOURCE_DIRECTORY, rootDirectory, FILE_NAME, FILE_DATA);
+ }
+
+ @Test
+ public void testMoveEmptyFile() throws Exception {
+ byte[] fileData = new byte[0];
+ createDirectoryAndUploadFile(SOURCE_DIRECTORY, FILE_NAME, fileData);
+
+ runProcessor(fileData);
+
+ assertSuccess(SOURCE_DIRECTORY, DESTINATION_DIRECTORY, FILE_NAME, fileData);
+ }
+
+ @Test
+ public void testMoveBigFile() throws Exception {
+ Random random = new Random();
+ byte[] fileData = new byte[120_000_000];
+ random.nextBytes(fileData);
+
+ createDirectoryAndUploadFile(SOURCE_DIRECTORY, FILE_NAME, fileData);
+
+ runProcessor(fileData);
+
+ assertSuccess(SOURCE_DIRECTORY, DESTINATION_DIRECTORY, FILE_NAME, fileData);
+ }
+
+ @Test
+ public void testMoveFileWithNonExistingFileSystem() {
+ createDirectoryAndUploadFile(SOURCE_DIRECTORY, FILE_NAME, FILE_DATA);
+
+ runner.setProperty(MoveAzureDataLakeStorage.DESTINATION_FILESYSTEM, "dummy");
+
+ runProcessor(FILE_DATA);
+
+ assertFailure();
+ }
+
+ @Test
+ public void testMoveFileWithInvalidFileName() {
+ createDirectoryAndUploadFile(SOURCE_DIRECTORY, FILE_NAME, FILE_DATA);
+
+ runner.setProperty(MoveAzureDataLakeStorage.FILE, "/file1");
+
+ runProcessor(FILE_DATA);
+
+ assertFailure();
+ }
+
+ @Test
+ public void testMoveFileWithSpacesInDirectoryAndFileName() throws Exception {
+ String sourceDirectory = "dir 1";
+ String destinationDirectory = "dest dir1";
+ String fileName = "file 1";
+ createDirectoryAndUploadFile(sourceDirectory, fileName, FILE_DATA);
+
+ runner.setProperty(MoveAzureDataLakeStorage.SOURCE_DIRECTORY, sourceDirectory);
+ runner.setProperty(MoveAzureDataLakeStorage.DESTINATION_DIRECTORY, destinationDirectory);
+ runner.setProperty(MoveAzureDataLakeStorage.FILE, fileName);
+
+ runProcessor(FILE_DATA);
+
+ assertSuccess(sourceDirectory, destinationDirectory, fileName, FILE_DATA);
+ }
+
+ @Test
+ public void testMoveFileToExistingFileWithFailResolution() {
+ createDirectoryAndUploadFile(SOURCE_DIRECTORY, FILE_NAME, FILE_DATA);
+ fileSystemClient.createFile(String.format("%s/%s", DESTINATION_DIRECTORY, FILE_NAME));
+
+ runner.setProperty(MoveAzureDataLakeStorage.CONFLICT_RESOLUTION, MoveAzureDataLakeStorage.FAIL_RESOLUTION);
+
+ runProcessor(FILE_DATA);
+
+ assertFailure();
+ }
+
+ @Test
+ public void testMoveFileToExistingFileWithReplaceResolution() throws Exception {
+ createDirectoryAndUploadFile(SOURCE_DIRECTORY, FILE_NAME, FILE_DATA);
+ fileSystemClient.createFile(String.format("%s/%s", DESTINATION_DIRECTORY, FILE_NAME));
+
+ runner.setProperty(MoveAzureDataLakeStorage.CONFLICT_RESOLUTION, MoveAzureDataLakeStorage.REPLACE_RESOLUTION);
+
+ runProcessor(FILE_DATA);
+
+ assertSuccess(SOURCE_DIRECTORY, DESTINATION_DIRECTORY, FILE_NAME, FILE_DATA);
+ }
+
+ @Test
+ public void testMoveFileToExistingFileWithIgnoreResolution() throws Exception {
+ String fileContent = "destination";
+ createDirectoryAndUploadFile(SOURCE_DIRECTORY, FILE_NAME, FILE_DATA);
+ createDirectoryAndUploadFile(DESTINATION_DIRECTORY, FILE_NAME, fileContent);
+
+ runner.setProperty(MoveAzureDataLakeStorage.CONFLICT_RESOLUTION, MoveAzureDataLakeStorage.IGNORE_RESOLUTION);
+
+ runProcessor(FILE_DATA);
+
+ assertSuccessWithIgnoreResolution(DESTINATION_DIRECTORY, FILE_NAME, FILE_DATA, fileContent.getBytes());
+ }
+
+ @Test
+ public void testMoveFileWithEL() throws Exception {
+ createDirectoryAndUploadFile(SOURCE_DIRECTORY, FILE_NAME, FILE_DATA);
+ Map<String, String> attributes = createAttributesMap(FILE_DATA);
+ setELProperties();
+
+ runProcessor(FILE_DATA, attributes);
+
+ assertSuccess(SOURCE_DIRECTORY, DESTINATION_DIRECTORY, FILE_NAME, FILE_DATA);
+ }
+
+ @Test
+ public void testMoveFileWithELButFilesystemIsNotSpecified() {
+ createDirectoryAndUploadFile(SOURCE_DIRECTORY, FILE_NAME, FILE_DATA);
+ Map<String, String> attributes = createAttributesMap(FILE_DATA);
+ attributes.remove(EL_FILESYSTEM);
+ setELProperties();
+
+ runProcessor(FILE_DATA, attributes);
+
+ assertFailure();
+ }
+
+ @Test
+ public void testMoveFileWithELButFileNameIsNotSpecified() {
+ createDirectoryAndUploadFile(SOURCE_DIRECTORY, FILE_NAME, FILE_DATA);
+ Map<String, String> attributes = createAttributesMap(FILE_DATA);
+ attributes.remove(EL_FILE_NAME);
+ setELProperties();
+
+ runProcessor(FILE_DATA, attributes);
+
+ assertFailure();
+ }
+
+ private Map<String, String> createAttributesMap(byte[] fileData) {
+ Map<String, String> attributes = new HashMap<>();
+
+ attributes.put(EL_FILESYSTEM, fileSystemName);
+ attributes.put(EL_DIRECTORY, SOURCE_DIRECTORY);
+ attributes.put(EL_FILE_NAME, FILE_NAME);
+ attributes.put(ATTR_NAME_LENGTH, String.valueOf(fileData.length));
+
+ return attributes;
+ }
+
+ private void setELProperties() {
+ runner.setProperty(MoveAzureDataLakeStorage.SOURCE_FILESYSTEM, String.format("${%s}", EL_FILESYSTEM));
+ runner.setProperty(MoveAzureDataLakeStorage.SOURCE_DIRECTORY, String.format("${%s}", EL_DIRECTORY));
+ runner.setProperty(MoveAzureDataLakeStorage.FILE, String.format("${%s}", EL_FILE_NAME));
+ }
+
+ private void runProcessor(byte[] fileData) {
+ runProcessor(fileData, Collections.singletonMap(ATTR_NAME_LENGTH, String.valueOf(fileData.length)));
+ }
+
+ private void runProcessor(byte[] testData, Map<String, String> attributes) {
+ runner.assertValid();
+ runner.enqueue(testData, attributes);
+ runner.run();
+ }
+
+ private void assertSuccess(String sourceDirectory, String destinationDirectory, String fileName, byte[] fileData) throws Exception {
+ assertFlowFile(fileData, fileName, sourceDirectory, destinationDirectory);
+ assertAzureFile(destinationDirectory, fileName, fileData);
+ assertProvenanceEvents();
+ }
+
+ private void assertSuccessWithIgnoreResolution(String destinationDirectory, String fileName, byte[] fileData, byte[] azureFileData) throws Exception {
+ assertFlowFile(fileData);
+ assertAzureFile(destinationDirectory, fileName, azureFileData);
+ }
+
+ private void assertFlowFile(byte[] fileData, String fileName, String sourceDirectory, String destinationDirectory) throws Exception {
+ MockFlowFile flowFile = assertFlowFile(fileData);
+
+ flowFile.assertAttributeEquals(ATTR_NAME_SOURCE_FILESYSTEM, fileSystemName);
+ flowFile.assertAttributeEquals(ATTR_NAME_SOURCE_DIRECTORY, sourceDirectory);
+ flowFile.assertAttributeEquals(ATTR_NAME_FILESYSTEM, fileSystemName);
+ flowFile.assertAttributeEquals(ATTR_NAME_DIRECTORY, destinationDirectory);
+ flowFile.assertAttributeEquals(ATTR_NAME_FILENAME, fileName);
+
+ String urlEscapedDirectory = UrlEscapers.urlPathSegmentEscaper().escape(destinationDirectory);
+ String urlEscapedFileName = UrlEscapers.urlPathSegmentEscaper().escape(fileName);
+ String urlEscapedPathSeparator = UrlEscapers.urlPathSegmentEscaper().escape("/");
+ String primaryUri = StringUtils.isNotEmpty(destinationDirectory)
+ ? String.format("https://%s.dfs.core.windows.net/%s/%s%s%s", getAccountName(), fileSystemName, urlEscapedDirectory, urlEscapedPathSeparator, urlEscapedFileName)
+ : String.format("https://%s.dfs.core.windows.net/%s/%s", getAccountName(), fileSystemName, urlEscapedFileName);
+ flowFile.assertAttributeEquals(ATTR_NAME_PRIMARY_URI, primaryUri);
+
+ flowFile.assertAttributeEquals(ATTR_NAME_LENGTH, Integer.toString(fileData.length));
+ }
+
+ private MockFlowFile assertFlowFile(byte[] fileData) throws Exception {
+ runner.assertAllFlowFilesTransferred(MoveAzureDataLakeStorage.REL_SUCCESS, 1);
+
+ MockFlowFile flowFile = runner.getFlowFilesForRelationship(MoveAzureDataLakeStorage.REL_SUCCESS).get(0);
+
+ flowFile.assertContentEquals(fileData);
+
+ return flowFile;
+ }
+
+ private void assertAzureFile(String destinationDirectory, String fileName, byte[] fileData) {
+ DataLakeFileClient fileClient;
+ if (StringUtils.isNotEmpty(destinationDirectory)) {
+ DataLakeDirectoryClient directoryClient = fileSystemClient.getDirectoryClient(destinationDirectory);
+ assertTrue(directoryClient.exists());
+
+ fileClient = directoryClient.getFileClient(fileName);
+ } else {
+ fileClient = fileSystemClient.getFileClient(fileName);
+ }
+
+ assertTrue(fileClient.exists());
+ assertEquals(fileData.length, fileClient.getProperties().getFileSize());
+ }
+
+ private void assertProvenanceEvents() {
+ Set<ProvenanceEventType> expectedEventTypes = Sets.newHashSet(ProvenanceEventType.SEND);
+
+ Set<ProvenanceEventType> actualEventTypes = runner.getProvenanceEvents().stream()
+ .map(ProvenanceEventRecord::getEventType)
+ .collect(Collectors.toSet());
+ assertEquals(expectedEventTypes, actualEventTypes);
+ }
+
+ private void assertFailure() {
+ runner.assertAllFlowFilesTransferred(MoveAzureDataLakeStorage.REL_FAILURE, 1);
+ }
+}