You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by tu...@apache.org on 2022/02/23 15:10:37 UTC

[nifi] branch main updated: NIFI-9657 Create MoveADLS processor

This is an automated email from the ASF dual-hosted git repository.

turcsanyi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new a3e1f32  NIFI-9657 Create MoveADLS processor
a3e1f32 is described below

commit a3e1f32cae964df9dee47600016db9d2baba5d34
Author: Timea Barna <ti...@gmail.com>
AuthorDate: Tue Feb 8 10:00:40 2022 +0100

    NIFI-9657 Create MoveADLS processor
    
    This closes #5752.
    
    Signed-off-by: Peter Turcsanyi <tu...@apache.org>
---
 .../AbstractAzureDataLakeStorageProcessor.java     |  39 ++-
 .../azure/storage/MoveAzureDataLakeStorage.java    | 221 ++++++++++++
 .../azure/storage/utils/ADLSAttributes.java        |   6 +
 .../services/org.apache.nifi.processor.Processor   |   1 +
 .../storage/AbstractAzureDataLakeStorageIT.java    |  18 +-
 .../azure/storage/ITMoveAzureDataLakeStorage.java  | 370 +++++++++++++++++++++
 6 files changed, 639 insertions(+), 16 deletions(-)

diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureDataLakeStorageProcessor.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureDataLakeStorageProcessor.java
index fbbcf25..d371e12 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureDataLakeStorageProcessor.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureDataLakeStorageProcessor.java
@@ -175,19 +175,27 @@ public abstract class AbstractAzureDataLakeStorageProcessor extends AbstractProc
     }
 
     public static String evaluateFileSystemProperty(ProcessContext context, FlowFile flowFile) {
-        String fileSystem = context.getProperty(FILESYSTEM).evaluateAttributeExpressions(flowFile).getValue();
+        return evaluateFileSystemProperty(context, flowFile, FILESYSTEM);
+    }
+
+    public static String evaluateFileSystemProperty(ProcessContext context, FlowFile flowFile, PropertyDescriptor property) {
+        String fileSystem = context.getProperty(property).evaluateAttributeExpressions(flowFile).getValue();
         if (StringUtils.isBlank(fileSystem)) {
-            throw new ProcessException(String.format("'%1$s' property evaluated to blank string. '%s' must be specified as a non-blank string.", FILESYSTEM.getDisplayName()));
+            throw new ProcessException(String.format("'%1$s' property evaluated to blank string. '%s' must be specified as a non-blank string.", property.getDisplayName()));
         }
         return fileSystem;
     }
 
     public static String evaluateDirectoryProperty(ProcessContext context, FlowFile flowFile) {
-        String directory = context.getProperty(DIRECTORY).evaluateAttributeExpressions(flowFile).getValue();
+        return evaluateDirectoryProperty(context, flowFile, DIRECTORY);
+    }
+
+    public static String evaluateDirectoryProperty(ProcessContext context, FlowFile flowFile, PropertyDescriptor property) {
+        String directory = context.getProperty(property).evaluateAttributeExpressions(flowFile).getValue();
         if (directory.startsWith("/")) {
-            throw new ProcessException(String.format("'%1$s' starts with '/'. '%s' cannot contain a leading '/'.", DIRECTORY.getDisplayName()));
+            throw new ProcessException(String.format("'%1$s' starts with '/'. '%s' cannot contain a leading '/'.", property.getDisplayName()));
         } else if (StringUtils.isNotEmpty(directory) && StringUtils.isWhitespace(directory)) {
-            throw new ProcessException(String.format("'%1$s' contains whitespace characters only.", DIRECTORY.getDisplayName()));
+            throw new ProcessException(String.format("'%1$s' contains whitespace characters only.", property.getDisplayName()));
         }
         return directory;
     }
@@ -200,19 +208,30 @@ public abstract class AbstractAzureDataLakeStorageProcessor extends AbstractProc
         return fileName;
     }
 
-    private static class DirectoryValidator implements Validator {
-        @Override
+     public static class DirectoryValidator implements Validator {
+         private String displayName;
+
+         public DirectoryValidator() {
+             this.displayName = null;
+         }
+
+         public DirectoryValidator(String displayName) {
+             this.displayName = displayName;
+         }
+
+         @Override
         public ValidationResult validate(String subject, String input, ValidationContext context) {
+            displayName = displayName == null ? DIRECTORY.getDisplayName() : displayName;
             ValidationResult.Builder builder = new ValidationResult.Builder()
-                    .subject(DIRECTORY.getDisplayName())
+                    .subject(displayName)
                     .input(input);
 
             if (context.isExpressionLanguagePresent(input)) {
                 builder.valid(true).explanation("Expression Language Present");
             } else if (input.startsWith("/")) {
-                builder.valid(false).explanation(String.format("'%s' cannot contain a leading '/'", DIRECTORY.getDisplayName()));
+                builder.valid(false).explanation(String.format("'%s' cannot contain a leading '/'", displayName));
             } else if (StringUtils.isNotEmpty(input) && StringUtils.isWhitespace(input)) {
-                builder.valid(false).explanation(String.format("'%s' cannot contain whitespace characters only", DIRECTORY.getDisplayName()));
+                builder.valid(false).explanation(String.format("'%s' cannot contain whitespace characters only", displayName));
             } else {
                 builder.valid(true);
             }
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/MoveAzureDataLakeStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/MoveAzureDataLakeStorage.java
new file mode 100644
index 0000000..cb3d8b6
--- /dev/null
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/MoveAzureDataLakeStorage.java
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure.storage;
+
+import com.azure.storage.file.datalake.DataLakeDirectoryClient;
+import com.azure.storage.file.datalake.DataLakeFileClient;
+import com.azure.storage.file.datalake.DataLakeFileSystemClient;
+import com.azure.storage.file.datalake.DataLakeServiceClient;
+import com.azure.storage.file.datalake.models.DataLakeRequestConditions;
+import com.azure.storage.file.datalake.models.DataLakeStorageException;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_DESCRIPTION_DIRECTORY;
+import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_DESCRIPTION_FILENAME;
+import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_DESCRIPTION_FILESYSTEM;
+import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_DESCRIPTION_LENGTH;
+import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_DESCRIPTION_PRIMARY_URI;
+import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_DESCRIPTION_SOURCE_DIRECTORY;
+import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_DESCRIPTION_SOURCE_FILESYSTEM;
+import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_DIRECTORY;
+import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_FILENAME;
+import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_FILESYSTEM;
+import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_LENGTH;
+import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_PRIMARY_URI;
+import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_SOURCE_DIRECTORY;
+import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_SOURCE_FILESYSTEM;
+
+@Tags({"azure", "microsoft", "cloud", "storage", "adlsgen2", "datalake"})
+@SeeAlso({DeleteAzureDataLakeStorage.class, FetchAzureDataLakeStorage.class, ListAzureDataLakeStorage.class})
+@CapabilityDescription("Moves content within an Azure Data Lake Storage Gen 2." +
+        " After the move, files will be no longer available on source location.")
+@WritesAttributes({@WritesAttribute(attribute = ATTR_NAME_SOURCE_FILESYSTEM, description = ATTR_DESCRIPTION_SOURCE_FILESYSTEM),
+        @WritesAttribute(attribute = ATTR_NAME_SOURCE_DIRECTORY, description = ATTR_DESCRIPTION_SOURCE_DIRECTORY),
+        @WritesAttribute(attribute = ATTR_NAME_FILESYSTEM, description = ATTR_DESCRIPTION_FILESYSTEM),
+        @WritesAttribute(attribute = ATTR_NAME_DIRECTORY, description = ATTR_DESCRIPTION_DIRECTORY),
+        @WritesAttribute(attribute = ATTR_NAME_FILENAME, description = ATTR_DESCRIPTION_FILENAME),
+        @WritesAttribute(attribute = ATTR_NAME_PRIMARY_URI, description = ATTR_DESCRIPTION_PRIMARY_URI),
+        @WritesAttribute(attribute = ATTR_NAME_LENGTH, description = ATTR_DESCRIPTION_LENGTH)})
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+public class MoveAzureDataLakeStorage extends AbstractAzureDataLakeStorageProcessor {
+
+    public static final String FAIL_RESOLUTION = "fail";
+    public static final String REPLACE_RESOLUTION = "replace";
+    public static final String IGNORE_RESOLUTION = "ignore";
+
+
+    public static final PropertyDescriptor CONFLICT_RESOLUTION = new PropertyDescriptor.Builder()
+            .name("conflict-resolution-strategy")
+            .displayName("Conflict Resolution Strategy")
+            .description("Indicates what should happen when a file with the same name already exists in the output directory")
+            .required(true)
+            .defaultValue(FAIL_RESOLUTION)
+            .allowableValues(FAIL_RESOLUTION, REPLACE_RESOLUTION, IGNORE_RESOLUTION)
+            .build();
+
+    public static final PropertyDescriptor SOURCE_FILESYSTEM = new PropertyDescriptor.Builder()
+            .name("source-filesystem-name")
+            .displayName("Source Filesystem")
+            .description("Name of the Azure Storage File System from where the move should happen.")
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .required(true)
+            .defaultValue(String.format("${%s}", ATTR_NAME_FILESYSTEM))
+            .build();
+
+    public static final PropertyDescriptor SOURCE_DIRECTORY = new PropertyDescriptor.Builder()
+            .name("source-directory-name")
+            .displayName("Source Directory")
+            .description("Name of the Azure Storage Directory from where the move should happen. The Directory Name cannot contain a leading '/'." +
+                    " The root directory can be designated by the empty string value.")
+            .addValidator(new DirectoryValidator("Source Directory"))
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .required(true)
+            .defaultValue(String.format("${%s}", ATTR_NAME_DIRECTORY))
+            .build();
+
+    public static final PropertyDescriptor DESTINATION_FILESYSTEM = new PropertyDescriptor.Builder()
+            .fromPropertyDescriptor(FILESYSTEM)
+            .displayName("Destination Filesystem")
+            .description("Name of the Azure Storage File System where the files will be moved.")
+            .build();
+
+    public static final PropertyDescriptor DESTINATION_DIRECTORY = new PropertyDescriptor.Builder()
+            .fromPropertyDescriptor(DIRECTORY)
+            .displayName("Destination Directory")
+            .description("Name of the Azure Storage Directory where the files will be moved. The Directory Name cannot contain a leading '/'." +
+                    " The root directory can be designated by the empty string value. Non-existing directories will be created." +
+                    " If the original directory structure should be kept, the full directory path needs to be provided after the destination directory." +
+                    " e.g.: destdir/${azure.directory}")
+            .addValidator(new DirectoryValidator("Destination Directory"))
+            .build();
+
+    private static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList(
+            ADLS_CREDENTIALS_SERVICE,
+            SOURCE_FILESYSTEM,
+            SOURCE_DIRECTORY,
+            DESTINATION_FILESYSTEM,
+            DESTINATION_DIRECTORY,
+            FILE,
+            CONFLICT_RESOLUTION
+    ));
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTIES;
+    }
+
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
+        FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        final long startNanos = System.nanoTime();
+        try {
+            final String sourceFileSystem = evaluateFileSystemProperty(context, flowFile, SOURCE_FILESYSTEM);
+            final String sourceDirectory = evaluateDirectoryProperty(context, flowFile, SOURCE_DIRECTORY);
+            final String destinationFileSystem = evaluateFileSystemProperty(context, flowFile, DESTINATION_FILESYSTEM);
+            final String destinationDirectory = evaluateDirectoryProperty(context, flowFile, DESTINATION_DIRECTORY);
+            final String fileName = evaluateFileNameProperty(context, flowFile);
+
+            final String destinationPath;
+            if (!destinationDirectory.isEmpty() && !sourceDirectory.isEmpty()) {
+                destinationPath = destinationDirectory + "/";
+            } else {
+                destinationPath = destinationDirectory;
+            }
+
+            final DataLakeServiceClient storageClient = getStorageClient(context, flowFile);
+            final DataLakeFileSystemClient sourceFileSystemClient = storageClient.getFileSystemClient(sourceFileSystem);
+            final DataLakeDirectoryClient sourceDirectoryClient = sourceFileSystemClient.getDirectoryClient(sourceDirectory);
+            final DataLakeFileSystemClient destinationFileSystemClient = storageClient.getFileSystemClient(destinationFileSystem);
+            final DataLakeDirectoryClient destinationDirectoryClient = destinationFileSystemClient.getDirectoryClient(destinationDirectory);
+            DataLakeFileClient sourceFileClient = sourceDirectoryClient.getFileClient(fileName);
+            final DataLakeRequestConditions sourceConditions = new DataLakeRequestConditions();
+            final DataLakeRequestConditions destinationConditions = new DataLakeRequestConditions();
+            final String conflictResolution = context.getProperty(CONFLICT_RESOLUTION).getValue();
+
+            try {
+                if (!destinationDirectory.isEmpty() && !destinationDirectoryClient.exists()) {
+                    destinationDirectoryClient.create();
+                }
+
+                if (!conflictResolution.equals(REPLACE_RESOLUTION)) {
+                    destinationConditions.setIfNoneMatch("*");
+                }
+
+                final DataLakeFileClient destinationFileClient = sourceFileClient.renameWithResponse(destinationFileSystem,
+                                destinationPath + fileName,
+                                sourceConditions,
+                                destinationConditions,
+                                null,
+                                null)
+                        .getValue();
+
+                final Map<String, String> attributes = new HashMap<>();
+                attributes.put(ATTR_NAME_SOURCE_FILESYSTEM, sourceFileSystem);
+                attributes.put(ATTR_NAME_SOURCE_DIRECTORY, sourceDirectory);
+                attributes.put(ATTR_NAME_FILESYSTEM, destinationFileSystem);
+                attributes.put(ATTR_NAME_DIRECTORY, destinationDirectory);
+                attributes.put(ATTR_NAME_FILENAME, fileName);
+                attributes.put(ATTR_NAME_PRIMARY_URI, destinationFileClient.getFileUrl());
+                attributes.put(ATTR_NAME_LENGTH, String.valueOf(destinationFileClient.getProperties().getFileSize()));
+                flowFile = session.putAllAttributes(flowFile, attributes);
+
+                session.transfer(flowFile, REL_SUCCESS);
+                final long transferMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
+                session.getProvenanceReporter().send(flowFile, sourceFileClient.getFileUrl(), transferMillis);
+            } catch (DataLakeStorageException dlsException) {
+                if (dlsException.getStatusCode() == 409 && conflictResolution.equals(IGNORE_RESOLUTION)) {
+                    session.transfer(flowFile, REL_SUCCESS);
+                    String warningMessage = String.format("File with the same name already exists. " +
+                            "Remote file not modified. " +
+                            "Transferring {} to success due to %s being set to '%s'.", CONFLICT_RESOLUTION.getDisplayName(), conflictResolution);
+                    getLogger().warn(warningMessage, flowFile);
+                } else {
+                    throw dlsException;
+                }
+            }
+        } catch (Exception e) {
+            getLogger().error("Failed to move file on Azure Data Lake Storage", e);
+            flowFile = session.penalize(flowFile);
+            session.transfer(flowFile, REL_FAILURE);
+        }
+    }
+}
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/ADLSAttributes.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/ADLSAttributes.java
index 087cbaa..99f7dfc 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/ADLSAttributes.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/ADLSAttributes.java
@@ -42,4 +42,10 @@ public final class ADLSAttributes {
     public static final String ATTR_NAME_PRIMARY_URI = "azure.primaryUri";
     public static final String ATTR_DESCRIPTION_PRIMARY_URI = "Primary location for file content";
 
+    public static final String ATTR_NAME_SOURCE_FILESYSTEM = "azure.source.filesystem";
+    public static final String ATTR_DESCRIPTION_SOURCE_FILESYSTEM = "The name of the source Azure File System";
+
+    public static final String ATTR_NAME_SOURCE_DIRECTORY = "azure.source.directory";
+    public static final String ATTR_DESCRIPTION_SOURCE_DIRECTORY = "The name of the source Azure Directory";
+
 }
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index 24cadf6..ce7e074 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -30,3 +30,4 @@ org.apache.nifi.processors.azure.storage.ListAzureBlobStorage_v12
 org.apache.nifi.processors.azure.storage.FetchAzureBlobStorage_v12
 org.apache.nifi.processors.azure.storage.PutAzureBlobStorage_v12
 org.apache.nifi.processors.azure.storage.DeleteAzureBlobStorage_v12
+org.apache.nifi.processors.azure.storage.MoveAzureDataLakeStorage
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AbstractAzureDataLakeStorageIT.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AbstractAzureDataLakeStorageIT.java
index 54f0932..383ab16 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AbstractAzureDataLakeStorageIT.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AbstractAzureDataLakeStorageIT.java
@@ -87,22 +87,28 @@ public abstract class AbstractAzureDataLakeStorageIT extends AbstractAzureStorag
     }
 
     protected void uploadFile(String directory, String filename, String fileContent) {
-        byte[] fileContentBytes = fileContent.getBytes();
+        uploadFile(directory, filename, fileContent.getBytes());
+    }
+
+    protected void uploadFile(TestFile testFile) {
+        uploadFile(testFile.getDirectory(), testFile.getFilename(), testFile.getFileContent());
+    }
 
+    protected void uploadFile(String directory, String filename, byte[] fileData) {
         DataLakeDirectoryClient directoryClient = fileSystemClient.getDirectoryClient(directory);
         DataLakeFileClient fileClient = directoryClient.createFile(filename);
 
-        PutAzureDataLakeStorage.uploadContent(fileClient, new ByteArrayInputStream(fileContentBytes), fileContentBytes.length);
+        PutAzureDataLakeStorage.uploadContent(fileClient, new ByteArrayInputStream(fileData), fileData.length);
     }
 
-    protected void uploadFile(TestFile testFile) {
-        uploadFile(testFile.getDirectory(), testFile.getFilename(), testFile.getFileContent());
+    protected void createDirectoryAndUploadFile(String directory, String filename, String fileContent) {
+        createDirectoryAndUploadFile(directory, filename, fileContent.getBytes());
     }
 
-    protected void createDirectoryAndUploadFile(String directory, String filename, String fileContent) {
+    protected void createDirectoryAndUploadFile(String directory, String filename, byte[] fileData) {
         createDirectory(directory);
 
-        uploadFile(directory, filename, fileContent);
+        uploadFile(directory, filename, fileData);
     }
 
     protected void createDirectoryAndUploadFile(TestFile testFile) {
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITMoveAzureDataLakeStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITMoveAzureDataLakeStorage.java
new file mode 100644
index 0000000..d049d9d
--- /dev/null
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITMoveAzureDataLakeStorage.java
@@ -0,0 +1,370 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure.storage;
+
+import com.azure.storage.file.datalake.DataLakeDirectoryClient;
+import com.azure.storage.file.datalake.DataLakeFileClient;
+import com.google.common.collect.Sets;
+import com.google.common.net.UrlEscapers;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.processor.Processor;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.ProvenanceEventType;
+import org.apache.nifi.util.MockFlowFile;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_DIRECTORY;
+import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_FILENAME;
+import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_FILESYSTEM;
+import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_LENGTH;
+import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_PRIMARY_URI;
+import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_SOURCE_DIRECTORY;
+import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_SOURCE_FILESYSTEM;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class ITMoveAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT {
+
+    private static final String SOURCE_DIRECTORY = "sourceDir1";
+    private static final String DESTINATION_DIRECTORY = "destDir1";
+    private static final String FILE_NAME = "file1";
+    private static final byte[] FILE_DATA = "0123456789".getBytes();
+
+    private static final String EL_FILESYSTEM = "az.filesystem";
+    private static final String EL_DIRECTORY = "az.directory";
+    private static final String EL_FILE_NAME = "az.filename";
+
+    @Override
+    protected Class<? extends Processor> getProcessorClass() {
+        return MoveAzureDataLakeStorage.class;
+    }
+
+    @Before
+    public void setUp() throws InterruptedException {
+        runner.setProperty(MoveAzureDataLakeStorage.SOURCE_FILESYSTEM, fileSystemName);
+        runner.setProperty(MoveAzureDataLakeStorage.SOURCE_DIRECTORY, SOURCE_DIRECTORY);
+        runner.setProperty(MoveAzureDataLakeStorage.DESTINATION_FILESYSTEM, fileSystemName);
+        runner.setProperty(MoveAzureDataLakeStorage.DESTINATION_DIRECTORY, DESTINATION_DIRECTORY);
+        runner.setProperty(MoveAzureDataLakeStorage.FILE, FILE_NAME);
+    }
+
+    @Test
+    public void testMoveFileToExistingDirectory() throws Exception {
+        createDirectoryAndUploadFile(SOURCE_DIRECTORY, FILE_NAME, FILE_DATA);
+        createDirectory(DESTINATION_DIRECTORY);
+
+        runProcessor(FILE_DATA);
+
+        assertSuccess(SOURCE_DIRECTORY, DESTINATION_DIRECTORY, FILE_NAME, FILE_DATA);
+    }
+
+    @Test
+    public void testMoveFileToExistingDirectoryWithReplaceResolution() throws Exception {
+        createDirectoryAndUploadFile(SOURCE_DIRECTORY, FILE_NAME, FILE_DATA);
+        createDirectory(DESTINATION_DIRECTORY);
+
+        runner.setProperty(MoveAzureDataLakeStorage.CONFLICT_RESOLUTION, MoveAzureDataLakeStorage.REPLACE_RESOLUTION);
+
+        runProcessor(FILE_DATA);
+
+        assertSuccess(SOURCE_DIRECTORY, DESTINATION_DIRECTORY, FILE_NAME, FILE_DATA);
+    }
+
+    @Test
+    public void testMoveFileToExistingDirectoryWithIgnoreResolution() throws Exception {
+        createDirectoryAndUploadFile(SOURCE_DIRECTORY, FILE_NAME, FILE_DATA);
+        createDirectory(DESTINATION_DIRECTORY);
+
+        runner.setProperty(MoveAzureDataLakeStorage.CONFLICT_RESOLUTION, MoveAzureDataLakeStorage.IGNORE_RESOLUTION);
+
+        runProcessor(FILE_DATA);
+
+        assertSuccess(SOURCE_DIRECTORY, DESTINATION_DIRECTORY, FILE_NAME, FILE_DATA);
+    }
+
+    @Test
+    public void testMoveFileToNonExistingDirectory() throws Exception {
+        createDirectoryAndUploadFile(SOURCE_DIRECTORY, FILE_NAME, FILE_DATA);
+
+        runProcessor(FILE_DATA);
+
+        assertSuccess(SOURCE_DIRECTORY, DESTINATION_DIRECTORY, FILE_NAME, FILE_DATA);
+    }
+
+    @Test
+    public void testMoveFileToDeepDirectory() throws Exception {
+        String sourceDirectory = "dir1/dir2";
+        String destinationDirectory = sourceDirectory + "/dir3/dir4";
+        createDirectoryAndUploadFile(sourceDirectory, FILE_NAME, FILE_DATA);
+
+        runner.setProperty(MoveAzureDataLakeStorage.SOURCE_DIRECTORY, sourceDirectory);
+        runner.setProperty(MoveAzureDataLakeStorage.DESTINATION_DIRECTORY, destinationDirectory);
+
+        runProcessor(FILE_DATA);
+
+        assertSuccess(sourceDirectory, destinationDirectory, FILE_NAME, FILE_DATA);
+    }
+
+    @Test
+    public void testMoveFileToRootDirectory() throws Exception {
+        createDirectoryAndUploadFile(SOURCE_DIRECTORY, FILE_NAME, FILE_DATA);
+
+        String rootDirectory = "";
+        runner.setProperty(MoveAzureDataLakeStorage.DESTINATION_DIRECTORY, rootDirectory);
+
+        runProcessor(FILE_DATA);
+
+        assertSuccess(SOURCE_DIRECTORY, rootDirectory, FILE_NAME, FILE_DATA);
+    }
+
+    @Test
+    public void testMoveEmptyFile() throws Exception {
+        byte[] fileData = new byte[0];
+        createDirectoryAndUploadFile(SOURCE_DIRECTORY, FILE_NAME, fileData);
+
+        runProcessor(fileData);
+
+        assertSuccess(SOURCE_DIRECTORY, DESTINATION_DIRECTORY, FILE_NAME, fileData);
+    }
+
+    @Test
+    public void testMoveBigFile() throws Exception {
+        Random random = new Random();
+        byte[] fileData = new byte[120_000_000];
+        random.nextBytes(fileData);
+
+        createDirectoryAndUploadFile(SOURCE_DIRECTORY, FILE_NAME, fileData);
+
+        runProcessor(fileData);
+
+        assertSuccess(SOURCE_DIRECTORY, DESTINATION_DIRECTORY, FILE_NAME, fileData);
+    }
+
+    @Test
+    public void testMoveFileWithNonExistingFileSystem() {
+        createDirectoryAndUploadFile(SOURCE_DIRECTORY, FILE_NAME, FILE_DATA);
+
+        runner.setProperty(MoveAzureDataLakeStorage.DESTINATION_FILESYSTEM, "dummy");
+
+        runProcessor(FILE_DATA);
+
+        assertFailure();
+    }
+
+    @Test
+    public void testMoveFileWithInvalidFileName() {
+        createDirectoryAndUploadFile(SOURCE_DIRECTORY, FILE_NAME, FILE_DATA);
+
+        runner.setProperty(MoveAzureDataLakeStorage.FILE, "/file1");
+
+        runProcessor(FILE_DATA);
+
+        assertFailure();
+    }
+
+    @Test
+    public void testMoveFileWithSpacesInDirectoryAndFileName() throws Exception {
+        String sourceDirectory = "dir 1";
+        String destinationDirectory = "dest dir1";
+        String fileName = "file 1";
+        createDirectoryAndUploadFile(sourceDirectory, fileName, FILE_DATA);
+
+        runner.setProperty(MoveAzureDataLakeStorage.SOURCE_DIRECTORY, sourceDirectory);
+        runner.setProperty(MoveAzureDataLakeStorage.DESTINATION_DIRECTORY, destinationDirectory);
+        runner.setProperty(MoveAzureDataLakeStorage.FILE, fileName);
+
+        runProcessor(FILE_DATA);
+
+        assertSuccess(sourceDirectory, destinationDirectory, fileName, FILE_DATA);
+    }
+
+    @Test
+    public void testMoveFileToExistingFileWithFailResolution() {
+        createDirectoryAndUploadFile(SOURCE_DIRECTORY, FILE_NAME, FILE_DATA);
+        fileSystemClient.createFile(String.format("%s/%s", DESTINATION_DIRECTORY, FILE_NAME));
+
+        runner.setProperty(MoveAzureDataLakeStorage.CONFLICT_RESOLUTION, MoveAzureDataLakeStorage.FAIL_RESOLUTION);
+
+        runProcessor(FILE_DATA);
+
+        assertFailure();
+    }
+
+    @Test
+    public void testMoveFileToExistingFileWithReplaceResolution() throws Exception {
+        createDirectoryAndUploadFile(SOURCE_DIRECTORY, FILE_NAME, FILE_DATA);
+        fileSystemClient.createFile(String.format("%s/%s", DESTINATION_DIRECTORY, FILE_NAME));
+
+        runner.setProperty(MoveAzureDataLakeStorage.CONFLICT_RESOLUTION, MoveAzureDataLakeStorage.REPLACE_RESOLUTION);
+
+        runProcessor(FILE_DATA);
+
+        assertSuccess(SOURCE_DIRECTORY, DESTINATION_DIRECTORY, FILE_NAME, FILE_DATA);
+    }
+
+    @Test
+    public void testMoveFileToExistingFileWithIgnoreResolution() throws Exception {
+        String fileContent = "destination";
+        createDirectoryAndUploadFile(SOURCE_DIRECTORY, FILE_NAME, FILE_DATA);
+        createDirectoryAndUploadFile(DESTINATION_DIRECTORY, FILE_NAME, fileContent);
+
+        runner.setProperty(MoveAzureDataLakeStorage.CONFLICT_RESOLUTION, MoveAzureDataLakeStorage.IGNORE_RESOLUTION);
+
+        runProcessor(FILE_DATA);
+
+        assertSuccessWithIgnoreResolution(DESTINATION_DIRECTORY, FILE_NAME, FILE_DATA, fileContent.getBytes());
+    }
+
+    @Test
+    public void testMoveFileWithEL() throws Exception {
+        createDirectoryAndUploadFile(SOURCE_DIRECTORY, FILE_NAME, FILE_DATA);
+        Map<String, String> attributes = createAttributesMap(FILE_DATA);
+        setELProperties();
+
+        runProcessor(FILE_DATA, attributes);
+
+        assertSuccess(SOURCE_DIRECTORY, DESTINATION_DIRECTORY, FILE_NAME, FILE_DATA);
+    }
+
+    @Test
+    public void testMoveFileWithELButFilesystemIsNotSpecified() {
+        createDirectoryAndUploadFile(SOURCE_DIRECTORY, FILE_NAME, FILE_DATA);
+        Map<String, String> attributes = createAttributesMap(FILE_DATA);
+        attributes.remove(EL_FILESYSTEM);
+        setELProperties();
+
+        runProcessor(FILE_DATA, attributes);
+
+        assertFailure();
+    }
+
+    @Test
+    public void testMoveFileWithELButFileNameIsNotSpecified() {
+        createDirectoryAndUploadFile(SOURCE_DIRECTORY, FILE_NAME, FILE_DATA);
+        Map<String, String> attributes = createAttributesMap(FILE_DATA);
+        attributes.remove(EL_FILE_NAME);
+        setELProperties();
+
+        runProcessor(FILE_DATA, attributes);
+
+        assertFailure();
+    }
+
+    private Map<String, String> createAttributesMap(byte[] fileData) {
+        Map<String, String> attributes = new HashMap<>();
+
+        attributes.put(EL_FILESYSTEM, fileSystemName);
+        attributes.put(EL_DIRECTORY, SOURCE_DIRECTORY);
+        attributes.put(EL_FILE_NAME, FILE_NAME);
+        attributes.put(ATTR_NAME_LENGTH, String.valueOf(fileData.length));
+
+        return attributes;
+    }
+
+    private void setELProperties() {
+        runner.setProperty(MoveAzureDataLakeStorage.SOURCE_FILESYSTEM, String.format("${%s}", EL_FILESYSTEM));
+        runner.setProperty(MoveAzureDataLakeStorage.SOURCE_DIRECTORY, String.format("${%s}", EL_DIRECTORY));
+        runner.setProperty(MoveAzureDataLakeStorage.FILE, String.format("${%s}", EL_FILE_NAME));
+    }
+
+    private void runProcessor(byte[] fileData) {
+        runProcessor(fileData, Collections.singletonMap(ATTR_NAME_LENGTH, String.valueOf(fileData.length)));
+    }
+
+    private void runProcessor(byte[] testData, Map<String, String> attributes) {
+        runner.assertValid();
+        runner.enqueue(testData, attributes);
+        runner.run();
+    }
+
+    private void assertSuccess(String sourceDirectory, String destinationDirectory, String fileName, byte[] fileData) throws Exception {
+        assertFlowFile(fileData, fileName, sourceDirectory, destinationDirectory);
+        assertAzureFile(destinationDirectory, fileName, fileData);
+        assertProvenanceEvents();
+    }
+
+    private void assertSuccessWithIgnoreResolution(String destinationDirectory, String fileName, byte[] fileData, byte[] azureFileData) throws Exception {
+        assertFlowFile(fileData);
+        assertAzureFile(destinationDirectory, fileName, azureFileData);
+    }
+
+    private void assertFlowFile(byte[] fileData, String fileName, String sourceDirectory, String destinationDirectory) throws Exception {
+        MockFlowFile flowFile = assertFlowFile(fileData);
+
+        flowFile.assertAttributeEquals(ATTR_NAME_SOURCE_FILESYSTEM, fileSystemName);
+        flowFile.assertAttributeEquals(ATTR_NAME_SOURCE_DIRECTORY, sourceDirectory);
+        flowFile.assertAttributeEquals(ATTR_NAME_FILESYSTEM, fileSystemName);
+        flowFile.assertAttributeEquals(ATTR_NAME_DIRECTORY, destinationDirectory);
+        flowFile.assertAttributeEquals(ATTR_NAME_FILENAME, fileName);
+
+        String urlEscapedDirectory = UrlEscapers.urlPathSegmentEscaper().escape(destinationDirectory);
+        String urlEscapedFileName = UrlEscapers.urlPathSegmentEscaper().escape(fileName);
+        String urlEscapedPathSeparator = UrlEscapers.urlPathSegmentEscaper().escape("/");
+        String primaryUri = StringUtils.isNotEmpty(destinationDirectory)
+                ? String.format("https://%s.dfs.core.windows.net/%s/%s%s%s", getAccountName(), fileSystemName, urlEscapedDirectory, urlEscapedPathSeparator, urlEscapedFileName)
+                : String.format("https://%s.dfs.core.windows.net/%s/%s", getAccountName(), fileSystemName, urlEscapedFileName);
+        flowFile.assertAttributeEquals(ATTR_NAME_PRIMARY_URI, primaryUri);
+
+        flowFile.assertAttributeEquals(ATTR_NAME_LENGTH, Integer.toString(fileData.length));
+    }
+
+    private MockFlowFile assertFlowFile(byte[] fileData) throws Exception {
+        runner.assertAllFlowFilesTransferred(MoveAzureDataLakeStorage.REL_SUCCESS, 1);
+
+        MockFlowFile flowFile = runner.getFlowFilesForRelationship(MoveAzureDataLakeStorage.REL_SUCCESS).get(0);
+
+        flowFile.assertContentEquals(fileData);
+
+        return flowFile;
+    }
+
+    private void assertAzureFile(String destinationDirectory, String fileName, byte[] fileData) {
+        DataLakeFileClient fileClient;
+        if (StringUtils.isNotEmpty(destinationDirectory)) {
+            DataLakeDirectoryClient directoryClient = fileSystemClient.getDirectoryClient(destinationDirectory);
+            assertTrue(directoryClient.exists());
+
+            fileClient = directoryClient.getFileClient(fileName);
+        } else {
+            fileClient = fileSystemClient.getFileClient(fileName);
+        }
+
+        assertTrue(fileClient.exists());
+        assertEquals(fileData.length, fileClient.getProperties().getFileSize());
+    }
+
+    private void assertProvenanceEvents() {
+        Set<ProvenanceEventType> expectedEventTypes = Sets.newHashSet(ProvenanceEventType.SEND);
+
+        Set<ProvenanceEventType> actualEventTypes = runner.getProvenanceEvents().stream()
+                .map(ProvenanceEventRecord::getEventType)
+                .collect(Collectors.toSet());
+        assertEquals(expectedEventTypes, actualEventTypes);
+    }
+
+    private void assertFailure() {
+        runner.assertAllFlowFilesTransferred(MoveAzureDataLakeStorage.REL_FAILURE, 1);
+    }
+}