You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by pv...@apache.org on 2020/08/03 17:27:35 UTC

[nifi] branch main updated: NIFI-7340: Adding ListAzureDataLakeStorage Also added validator for Directory Name property in AbstractAzureDataLakeStorageProcessor Fix Tracking Entities strategy: use milliseconds for lastModified

This is an automated email from the ASF dual-hosted git repository.

pvillard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new c980b64  NIFI-7340: Adding ListAzureDataLakeStorage Also added validator for Directory Name property in AbstractAzureDataLakeStorageProcessor Fix Tracking Entities strategy: use milliseconds for lastModified
c980b64 is described below

commit c980b64bf56f6da39f385ebbc71aa8fc3cd95936
Author: Peter Turcsanyi <tu...@apache.org>
AuthorDate: Wed Jul 22 00:28:54 2020 +0200

    NIFI-7340: Adding ListAzureDataLakeStorage
    Also added validator for Directory Name property in AbstractAzureDataLakeStorageProcessor
    Fix Tracking Entities strategy: use milliseconds for lastModified
    
    Signed-off-by: Pierre Villard <pi...@gmail.com>
    
    This closes #4438.
---
 .../nifi/processor/util/StandardValidators.java    |   2 +
 .../AbstractAzureDataLakeStorageProcessor.java     |  70 +++++-
 .../azure/storage/DeleteAzureDataLakeStorage.java  |  18 +-
 .../azure/storage/FetchAzureDataLakeStorage.java   |  18 +-
 .../azure/storage/ListAzureDataLakeStorage.java    | 264 ++++++++++++++++++++
 .../azure/storage/PutAzureDataLakeStorage.java     |  49 ++--
 .../azure/storage/utils/ADLSAttributes.java        |  45 ++++
 .../azure/storage/utils/ADLSFileInfo.java          | 199 +++++++++++++++
 .../services/org.apache.nifi.processor.Processor   |   3 +-
 .../storage/AbstractAzureDataLakeStorageIT.java    |  48 +++-
 .../azure/storage/ITListAzureDataLakeStorage.java  | 267 +++++++++++++++++++++
 .../azure/storage/ITPutAzureDataLakeStorage.java   |  24 +-
 .../storage/TestAbstractAzureDataLakeStorage.java  |  25 ++
 13 files changed, 956 insertions(+), 76 deletions(-)

diff --git a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/processor/util/StandardValidators.java b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/processor/util/StandardValidators.java
index 11962bc..5ada3dd 100644
--- a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/processor/util/StandardValidators.java
+++ b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/processor/util/StandardValidators.java
@@ -414,6 +414,8 @@ public class StandardValidators {
 
     public static final Validator REGULAR_EXPRESSION_VALIDATOR = createRegexValidator(0, Integer.MAX_VALUE, false);
 
+    public static final Validator REGULAR_EXPRESSION_WITH_EL_VALIDATOR = createRegexValidator(0, Integer.MAX_VALUE, true);
+
     public static final Validator ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR = new Validator() {
         @Override
         public ValidationResult validate(final String subject, final String input, final ValidationContext context) {
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureDataLakeStorageProcessor.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureDataLakeStorageProcessor.java
index fd0be04..bb22c57 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureDataLakeStorageProcessor.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureDataLakeStorageProcessor.java
@@ -33,17 +33,23 @@ import com.azure.storage.file.datalake.DataLakeServiceClientBuilder;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.components.Validator;
 import org.apache.nifi.context.PropertyContext;
 import org.apache.nifi.expression.ExpressionLanguageScope;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.services.azure.storage.ADLSCredentialsDetails;
 import org.apache.nifi.services.azure.storage.ADLSCredentialsService;
 import reactor.core.publisher.Mono;
 
+import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_FILENAME;
+
 public abstract class AbstractAzureDataLakeStorageProcessor extends AbstractProcessor {
 
     public static final PropertyDescriptor ADLS_CREDENTIALS_SERVICE = new PropertyDescriptor.Builder()
@@ -57,15 +63,16 @@ public abstract class AbstractAzureDataLakeStorageProcessor extends AbstractProc
     public static final PropertyDescriptor FILESYSTEM = new PropertyDescriptor.Builder()
             .name("filesystem-name").displayName("Filesystem Name")
             .description("Name of the Azure Storage File System. It is assumed to be already existing.")
-            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
             .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
             .required(true)
             .build();
 
     public static final PropertyDescriptor DIRECTORY = new PropertyDescriptor.Builder()
             .name("directory-name").displayName("Directory Name")
-            .description("Name of the Azure Storage Directory. In case of the PutAzureDatalakeStorage processor, it will be created if not already existing.")
-            .addValidator(Validator.VALID)
+            .description("Name of the Azure Storage Directory. The Directory Name cannot contain a leading '/'. The root directory can be designated by the empty string value. " +
+                    "In case of the PutAzureDataLakeStorage processor, the directory will be created if not already existing.")
+            .addValidator(new DirectoryValidator())
             .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
             .required(true)
             .build();
@@ -73,10 +80,10 @@ public abstract class AbstractAzureDataLakeStorageProcessor extends AbstractProc
     public static final PropertyDescriptor FILE = new PropertyDescriptor.Builder()
             .name("file-name").displayName("File Name")
             .description("The filename")
-            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
             .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
             .required(true)
-            .defaultValue("${azure.filename}")
+            .defaultValue(String.format("${%s}", ATTR_NAME_FILENAME))
             .build();
 
     public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description(
@@ -103,6 +110,11 @@ public abstract class AbstractAzureDataLakeStorageProcessor extends AbstractProc
         return PROPERTIES;
     }
 
+    @Override
+    public Set<Relationship> getRelationships() {
+        return RELATIONSHIPS;
+    }
+
     public static DataLakeServiceClient getStorageClient(PropertyContext context, FlowFile flowFile) {
         final Map<String, String> attributes = flowFile != null ? flowFile.getAttributes() : Collections.emptyMap();
 
@@ -146,8 +158,50 @@ public abstract class AbstractAzureDataLakeStorageProcessor extends AbstractProc
         return storageClient;
     }
 
-    @Override
-    public Set<Relationship> getRelationships() {
-        return RELATIONSHIPS;
+    public static String evaluateFileSystemProperty(ProcessContext context, FlowFile flowFile) {
+        String fileSystem = context.getProperty(FILESYSTEM).evaluateAttributeExpressions(flowFile).getValue();
+        if (StringUtils.isBlank(fileSystem)) {
+            throw new ProcessException(String.format("'%1$s' property evaluated to blank string. '%s' must be specified as a non-blank string.", FILESYSTEM.getDisplayName()));
+        }
+        return fileSystem;
+    }
+
+    public static String evaluateDirectoryProperty(ProcessContext context, FlowFile flowFile) {
+        String directory = context.getProperty(DIRECTORY).evaluateAttributeExpressions(flowFile).getValue();
+        if (directory.startsWith("/")) {
+            throw new ProcessException(String.format("'%1$s' starts with '/'. '%s' cannot contain a leading '/'.", DIRECTORY.getDisplayName()));
+        } else if (StringUtils.isNotEmpty(directory) && StringUtils.isWhitespace(directory)) {
+            throw new ProcessException(String.format("'%1$s' contains whitespace characters only.", DIRECTORY.getDisplayName()));
+        }
+        return directory;
+    }
+
+    public static String evaluateFileNameProperty(ProcessContext context, FlowFile flowFile) {
+        String fileName = context.getProperty(FILE).evaluateAttributeExpressions(flowFile).getValue();
+        if (StringUtils.isBlank(fileName)) {
+            throw new ProcessException(String.format("'%1$s' property evaluated to blank string. '%s' must be specified as a non-blank string.", FILE.getDisplayName()));
+        }
+        return fileName;
+    }
+
+    private static class DirectoryValidator implements Validator {
+        @Override
+        public ValidationResult validate(String subject, String input, ValidationContext context) {
+            ValidationResult.Builder builder = new ValidationResult.Builder()
+                    .subject(DIRECTORY.getDisplayName())
+                    .input(input);
+
+            if (context.isExpressionLanguagePresent(input)) {
+                builder.valid(true).explanation("Expression Language Present");
+            } else if (input.startsWith("/")) {
+                builder.valid(false).explanation(String.format("'%s' cannot contain a leading '/'", DIRECTORY.getDisplayName()));
+            } else if (StringUtils.isNotEmpty(input) && StringUtils.isWhitespace(input)) {
+                builder.valid(false).explanation(String.format("'%s' cannot contain whitespace characters only", DIRECTORY.getDisplayName()));
+            } else {
+                builder.valid(true);
+            }
+
+            return builder.build();
+        }
     }
 }
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/DeleteAzureDataLakeStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/DeleteAzureDataLakeStorage.java
index 314785a..0e5128e 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/DeleteAzureDataLakeStorage.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/DeleteAzureDataLakeStorage.java
@@ -20,7 +20,6 @@ import com.azure.storage.file.datalake.DataLakeDirectoryClient;
 import com.azure.storage.file.datalake.DataLakeFileClient;
 import com.azure.storage.file.datalake.DataLakeFileSystemClient;
 import com.azure.storage.file.datalake.DataLakeServiceClient;
-import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
@@ -35,7 +34,7 @@ import org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor;
 import java.util.concurrent.TimeUnit;
 
 @Tags({"azure", "microsoft", "cloud", "storage", "adlsgen2", "datalake"})
-@SeeAlso({PutAzureDataLakeStorage.class, FetchAzureDataLakeStorage.class})
+@SeeAlso({PutAzureDataLakeStorage.class, FetchAzureDataLakeStorage.class, ListAzureDataLakeStorage.class})
 @CapabilityDescription("Deletes the provided file from Azure Data Lake Storage")
 @InputRequirement(Requirement.INPUT_REQUIRED)
 public class DeleteAzureDataLakeStorage extends AbstractAzureDataLakeStorageProcessor {
@@ -49,18 +48,9 @@ public class DeleteAzureDataLakeStorage extends AbstractAzureDataLakeStorageProc
 
         final long startNanos = System.nanoTime();
         try {
-            final String fileSystem = context.getProperty(FILESYSTEM).evaluateAttributeExpressions(flowFile).getValue();
-            final String directory = context.getProperty(DIRECTORY).evaluateAttributeExpressions(flowFile).getValue();
-            final String fileName = context.getProperty(FILE).evaluateAttributeExpressions(flowFile).getValue();
-
-            if (StringUtils.isBlank(fileSystem)) {
-                throw new ProcessException(FILESYSTEM.getDisplayName() + " property evaluated to empty string. " +
-                        FILESYSTEM.getDisplayName() + " must be specified as a non-empty string.");
-            }
-            if (StringUtils.isBlank(fileName)) {
-                throw new ProcessException(FILE.getDisplayName() + " property evaluated to empty string. " +
-                        FILE.getDisplayName() + " must be specified as a non-empty string.");
-            }
+            final String fileSystem = evaluateFileSystemProperty(context, flowFile);
+            final String directory = evaluateDirectoryProperty(context, flowFile);
+            final String fileName = evaluateFileNameProperty(context, flowFile);
 
             final DataLakeServiceClient storageClient = getStorageClient(context, flowFile);
             final DataLakeFileSystemClient fileSystemClient = storageClient.getFileSystemClient(fileSystem);
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureDataLakeStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureDataLakeStorage.java
index 5af7988..a4febcb 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureDataLakeStorage.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureDataLakeStorage.java
@@ -20,7 +20,6 @@ import com.azure.storage.file.datalake.DataLakeDirectoryClient;
 import com.azure.storage.file.datalake.DataLakeFileClient;
 import com.azure.storage.file.datalake.DataLakeFileSystemClient;
 import com.azure.storage.file.datalake.DataLakeServiceClient;
-import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
@@ -35,7 +34,7 @@ import org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor;
 import java.util.concurrent.TimeUnit;
 
 @Tags({"azure", "microsoft", "cloud", "storage", "adlsgen2", "datalake"})
-@SeeAlso({PutAzureDataLakeStorage.class, DeleteAzureDataLakeStorage.class})
+@SeeAlso({PutAzureDataLakeStorage.class, DeleteAzureDataLakeStorage.class, ListAzureDataLakeStorage.class})
 @CapabilityDescription("Fetch the provided file from Azure Data Lake Storage")
 @InputRequirement(Requirement.INPUT_REQUIRED)
 public class FetchAzureDataLakeStorage extends AbstractAzureDataLakeStorageProcessor {
@@ -49,18 +48,9 @@ public class FetchAzureDataLakeStorage extends AbstractAzureDataLakeStorageProce
 
         final long startNanos = System.nanoTime();
         try {
-            final String fileSystem = context.getProperty(FILESYSTEM).evaluateAttributeExpressions(flowFile).getValue();
-            final String directory = context.getProperty(DIRECTORY).evaluateAttributeExpressions(flowFile).getValue();
-            final String fileName = context.getProperty(FILE).evaluateAttributeExpressions(flowFile).getValue();
-
-            if (StringUtils.isBlank(fileSystem)) {
-                throw new ProcessException(FILESYSTEM.getDisplayName() + " property evaluated to empty string. " +
-                        FILESYSTEM.getDisplayName() + " must be specified as a non-empty string.");
-            }
-            if (StringUtils.isBlank(fileName)) {
-                throw new ProcessException(FILE.getDisplayName() + " property evaluated to empty string. " +
-                        FILE.getDisplayName() + " must be specified as a non-empty string.");
-            }
+            final String fileSystem = evaluateFileSystemProperty(context, flowFile);
+            final String directory = evaluateDirectoryProperty(context, flowFile);
+            final String fileName = evaluateFileNameProperty(context, flowFile);
 
             final DataLakeServiceClient storageClient = getStorageClient(context, flowFile);
             final DataLakeFileSystemClient fileSystemClient = storageClient.getFileSystemClient(fileSystem);
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureDataLakeStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureDataLakeStorage.java
new file mode 100644
index 0000000..c726e81
--- /dev/null
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureDataLakeStorage.java
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure.storage;
+
+import com.azure.storage.file.datalake.DataLakeFileSystemClient;
+import com.azure.storage.file.datalake.DataLakeServiceClient;
+import com.azure.storage.file.datalake.models.ListPathsOptions;
+import org.apache.commons.lang3.RegExUtils;
+import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.PrimaryNodeOnly;
+import org.apache.nifi.annotation.behavior.Stateful;
+import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.context.PropertyContext;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processor.util.list.AbstractListProcessor;
+import org.apache.nifi.processors.azure.storage.utils.ADLSFileInfo;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+import static org.apache.nifi.processor.util.list.ListedEntityTracker.INITIAL_LISTING_TARGET;
+import static org.apache.nifi.processor.util.list.ListedEntityTracker.TRACKING_STATE_CACHE;
+import static org.apache.nifi.processor.util.list.ListedEntityTracker.TRACKING_TIME_WINDOW;
+import static org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor.ADLS_CREDENTIALS_SERVICE;
+import static org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor.DIRECTORY;
+import static org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor.FILESYSTEM;
+import static org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor.evaluateDirectoryProperty;
+import static org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor.evaluateFileSystemProperty;
+import static org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor.getStorageClient;
+import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_DESCRIPTION_DIRECTORY;
+import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_DESCRIPTION_ETAG;
+import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_DESCRIPTION_FILENAME;
+import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_DESCRIPTION_FILESYSTEM;
+import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_DESCRIPTION_FILE_PATH;
+import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_DESCRIPTION_LAST_MODIFIED;
+import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_DESCRIPTION_LENGTH;
+import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_DIRECTORY;
+import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_ETAG;
+import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_FILENAME;
+import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_FILESYSTEM;
+import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_FILE_PATH;
+import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_LAST_MODIFIED;
+import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_LENGTH;
+
+@PrimaryNodeOnly
+@TriggerSerially
+@Tags({"azure", "microsoft", "cloud", "storage", "adlsgen2", "datalake"})
+@SeeAlso({PutAzureDataLakeStorage.class, DeleteAzureDataLakeStorage.class, FetchAzureDataLakeStorage.class})
+@CapabilityDescription("Lists directory in an Azure Data Lake Storage Gen 2 filesystem")
+@WritesAttributes({
+        @WritesAttribute(attribute = ATTR_NAME_FILESYSTEM, description = ATTR_DESCRIPTION_FILESYSTEM),
+        @WritesAttribute(attribute = ATTR_NAME_FILE_PATH, description = ATTR_DESCRIPTION_FILE_PATH),
+        @WritesAttribute(attribute = ATTR_NAME_DIRECTORY, description = ATTR_DESCRIPTION_DIRECTORY),
+        @WritesAttribute(attribute = ATTR_NAME_FILENAME, description = ATTR_DESCRIPTION_FILENAME),
+        @WritesAttribute(attribute = ATTR_NAME_LENGTH, description = ATTR_DESCRIPTION_LENGTH),
+        @WritesAttribute(attribute = ATTR_NAME_LAST_MODIFIED, description = ATTR_DESCRIPTION_LAST_MODIFIED),
+        @WritesAttribute(attribute = ATTR_NAME_ETAG, description = ATTR_DESCRIPTION_ETAG)
+})
+@InputRequirement(Requirement.INPUT_FORBIDDEN)
+@Stateful(scopes = {Scope.CLUSTER}, description = "After performing a listing of files, the timestamp of the newest file is stored. " +
+        "This allows the Processor to list only files that have been added or modified after this date the next time that the Processor is run. State is " +
+        "stored across the cluster so that this Processor can be run on Primary Node only and if a new Primary Node is selected, the new node can pick up " +
+        "where the previous node left off, without duplicating the data.")
+public class ListAzureDataLakeStorage extends AbstractListProcessor<ADLSFileInfo> {
+
+    public static final PropertyDescriptor RECURSE_SUBDIRECTORIES = new PropertyDescriptor.Builder()
+            .name("recurse-subdirectories")
+            .displayName("Recurse Subdirectories")
+            .description("Indicates whether to list files from subdirectories of the directory")
+            .required(true)
+            .allowableValues("true", "false")
+            .defaultValue("true")
+            .build();
+
+    public static final PropertyDescriptor FILE_FILTER = new PropertyDescriptor.Builder()
+            .name("file-filter")
+            .displayName("File Filter")
+            .description("Only files whose names match the given regular expression will be listed")
+            .required(false)
+            .addValidator(StandardValidators.REGULAR_EXPRESSION_WITH_EL_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    public static final PropertyDescriptor PATH_FILTER = new PropertyDescriptor.Builder()
+            .name("path-filter")
+            .displayName("Path Filter")
+            .description(String.format("When '%s' is true, then only subdirectories whose paths match the given regular expression will be scanned", RECURSE_SUBDIRECTORIES.getDisplayName()))
+            .required(false)
+            .addValidator(StandardValidators.REGULAR_EXPRESSION_WITH_EL_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    private static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList(
+            ADLS_CREDENTIALS_SERVICE,
+            FILESYSTEM,
+            DIRECTORY,
+            RECURSE_SUBDIRECTORIES,
+            FILE_FILTER,
+            PATH_FILTER,
+            RECORD_WRITER,
+            LISTING_STRATEGY,
+            TRACKING_STATE_CACHE,
+            TRACKING_TIME_WINDOW,
+            INITIAL_LISTING_TARGET));
+
+    private static final Set<PropertyDescriptor> LISTING_RESET_PROPERTIES = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
+            ADLS_CREDENTIALS_SERVICE,
+            FILESYSTEM,
+            DIRECTORY,
+            RECURSE_SUBDIRECTORIES,
+            FILE_FILTER,
+            PATH_FILTER,
+            LISTING_STRATEGY)));
+
+    private volatile Pattern filePattern;
+    private volatile Pattern pathPattern;
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTIES;
+    }
+
+    @OnScheduled
+    public void onScheduled(ProcessContext context) {
+        String fileFilter = context.getProperty(FILE_FILTER).evaluateAttributeExpressions().getValue();
+        filePattern = fileFilter != null ? Pattern.compile(fileFilter) : null;
+
+        String pathFilter = context.getProperty(PATH_FILTER).evaluateAttributeExpressions().getValue();
+        pathPattern = pathFilter != null ? Pattern.compile(pathFilter) : null;
+    }
+
+    @OnStopped
+    public void onStopped() {
+        filePattern = null;
+        pathPattern = null;
+    }
+
+    @Override
+    protected void customValidate(ValidationContext context, Collection<ValidationResult> results) {
+        if (context.getProperty(PATH_FILTER).isSet() && !context.getProperty(RECURSE_SUBDIRECTORIES).asBoolean()) {
+            results.add(new ValidationResult.Builder()
+                    .subject(PATH_FILTER.getDisplayName())
+                    .valid(false)
+                    .explanation(String.format("'%s' cannot be set when '%s' is false", PATH_FILTER.getDisplayName(), RECURSE_SUBDIRECTORIES.getDisplayName()))
+                    .build());
+        }
+    }
+
+    @Override
+    protected RecordSchema getRecordSchema() {
+        return ADLSFileInfo.getRecordSchema();
+    }
+
+    @Override
+    protected Scope getStateScope(PropertyContext context) {
+        return Scope.CLUSTER;
+    }
+
+    @Override
+    protected String getDefaultTimePrecision() {
+        return PRECISION_MILLIS.getValue();
+    }
+
+    @Override
+    protected boolean isListingResetNecessary(PropertyDescriptor property) {
+        return LISTING_RESET_PROPERTIES.contains(property);
+    }
+
+    @Override
+    protected String getPath(ProcessContext context) {
+        String directory = context.getProperty(DIRECTORY).evaluateAttributeExpressions().getValue();
+        return directory != null ? directory : ".";
+    }
+
+    @Override
+    protected List<ADLSFileInfo> performListing(ProcessContext context, Long minTimestamp) throws IOException {
+        try {
+            String fileSystem = evaluateFileSystemProperty(context, null);
+            String baseDirectory = evaluateDirectoryProperty(context, null);
+            boolean recurseSubdirectories = context.getProperty(RECURSE_SUBDIRECTORIES).asBoolean();
+
+            DataLakeServiceClient storageClient = getStorageClient(context, null);
+            DataLakeFileSystemClient fileSystemClient = storageClient.getFileSystemClient(fileSystem);
+
+            ListPathsOptions options = new ListPathsOptions();
+            options.setPath(baseDirectory);
+            options.setRecursive(recurseSubdirectories);
+
+            Pattern baseDirectoryPattern = Pattern.compile("^" + baseDirectory + "/?");
+
+            List<ADLSFileInfo> listing = fileSystemClient.listPaths(options, null).stream()
+                    .filter(pathItem -> !pathItem.isDirectory())
+                    .map(pathItem -> new ADLSFileInfo.Builder()
+                            .fileSystem(fileSystem)
+                            .filePath(pathItem.getName())
+                            .length(pathItem.getContentLength())
+                            .lastModified(pathItem.getLastModified().toInstant().toEpochMilli())
+                            .etag(pathItem.getETag())
+                            .build())
+                    .filter(fileInfo -> filePattern == null || filePattern.matcher(fileInfo.getFilename()).matches())
+                    .filter(fileInfo -> pathPattern == null || pathPattern.matcher(RegExUtils.removeFirst(fileInfo.getDirectory(), baseDirectoryPattern)).matches())
+                    .collect(Collectors.toList());
+
+            return listing;
+        } catch (Exception e) {
+            getLogger().error("Failed to list directory on Azure Data Lake Storage", e);
+            throw new IOException(ExceptionUtils.getRootCause(e));
+        }
+    }
+
+    @Override
+    protected Map<String, String> createAttributes(ADLSFileInfo fileInfo, ProcessContext context) {
+        Map<String, String> attributes = new HashMap<>();
+
+        attributes.put(ATTR_NAME_FILESYSTEM, fileInfo.getFileSystem());
+        attributes.put(ATTR_NAME_FILE_PATH, fileInfo.getFilePath());
+        attributes.put(ATTR_NAME_DIRECTORY, fileInfo.getDirectory());
+        attributes.put(ATTR_NAME_FILENAME, fileInfo.getFilename());
+        attributes.put(ATTR_NAME_LENGTH, String.valueOf(fileInfo.getLength()));
+        attributes.put(ATTR_NAME_LAST_MODIFIED, String.valueOf(fileInfo.getLastModified()));
+        attributes.put(ATTR_NAME_ETAG, fileInfo.getEtag());
+
+        return attributes;
+    }
+}
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureDataLakeStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureDataLakeStorage.java
index 081372f..ae67e3e 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureDataLakeStorage.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureDataLakeStorage.java
@@ -21,7 +21,6 @@ import com.azure.storage.file.datalake.DataLakeFileClient;
 import com.azure.storage.file.datalake.DataLakeFileSystemClient;
 import com.azure.storage.file.datalake.DataLakeServiceClient;
 import com.azure.storage.file.datalake.models.DataLakeStorageException;
-import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
 import org.apache.nifi.annotation.behavior.WritesAttribute;
@@ -46,14 +45,25 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
+import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_DESCRIPTION_DIRECTORY;
+import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_DESCRIPTION_FILENAME;
+import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_DESCRIPTION_FILESYSTEM;
+import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_DESCRIPTION_LENGTH;
+import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_DESCRIPTION_PRIMARY_URI;
+import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_DIRECTORY;
+import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_FILENAME;
+import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_FILESYSTEM;
+import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_LENGTH;
+import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_PRIMARY_URI;
+
 @Tags({"azure", "microsoft", "cloud", "storage", "adlsgen2", "datalake"})
-@SeeAlso({DeleteAzureDataLakeStorage.class, FetchAzureDataLakeStorage.class})
+@SeeAlso({DeleteAzureDataLakeStorage.class, FetchAzureDataLakeStorage.class, ListAzureDataLakeStorage.class})
 @CapabilityDescription("Puts content into an Azure Data Lake Storage Gen 2")
-@WritesAttributes({@WritesAttribute(attribute = "azure.filesystem", description = "The name of the Azure File System"),
-        @WritesAttribute(attribute = "azure.directory", description = "The name of the Azure Directory"),
-        @WritesAttribute(attribute = "azure.filename", description = "The name of the Azure File Name"),
-        @WritesAttribute(attribute = "azure.primaryUri", description = "Primary location for file content"),
-        @WritesAttribute(attribute = "azure.length", description = "Length of the file")})
+@WritesAttributes({@WritesAttribute(attribute = ATTR_NAME_FILESYSTEM, description = ATTR_DESCRIPTION_FILESYSTEM),
+        @WritesAttribute(attribute = ATTR_NAME_DIRECTORY, description = ATTR_DESCRIPTION_DIRECTORY),
+        @WritesAttribute(attribute = ATTR_NAME_FILENAME, description = ATTR_DESCRIPTION_FILENAME),
+        @WritesAttribute(attribute = ATTR_NAME_PRIMARY_URI, description = ATTR_DESCRIPTION_PRIMARY_URI),
+        @WritesAttribute(attribute = ATTR_NAME_LENGTH, description = ATTR_DESCRIPTION_LENGTH)})
 @InputRequirement(Requirement.INPUT_REQUIRED)
 public class PutAzureDataLakeStorage extends AbstractAzureDataLakeStorageProcessor {
 
@@ -93,18 +103,9 @@ public class PutAzureDataLakeStorage extends AbstractAzureDataLakeStorageProcess
 
         final long startNanos = System.nanoTime();
         try {
-            final String fileSystem = context.getProperty(FILESYSTEM).evaluateAttributeExpressions(flowFile).getValue();
-            final String directory = context.getProperty(DIRECTORY).evaluateAttributeExpressions(flowFile).getValue();
-            final String fileName = context.getProperty(FILE).evaluateAttributeExpressions(flowFile).getValue();
-
-            if (StringUtils.isBlank(fileSystem)) {
-                throw new ProcessException(FILESYSTEM.getDisplayName() + " property evaluated to empty string. " +
-                        FILESYSTEM.getDisplayName() + " must be specified as a non-empty string.");
-            }
-            if (StringUtils.isBlank(fileName)) {
-                throw new ProcessException(FILE.getDisplayName() + " property evaluated to empty string. " +
-                        FILE.getDisplayName() + " must be specified as a non-empty string.");
-            }
+            final String fileSystem = evaluateFileSystemProperty(context, flowFile);
+            final String directory = evaluateDirectoryProperty(context, flowFile);
+            final String fileName = evaluateFileNameProperty(context, flowFile);
 
             final DataLakeServiceClient storageClient = getStorageClient(context, flowFile);
             final DataLakeFileSystemClient fileSystemClient = storageClient.getFileSystemClient(fileSystem);
@@ -126,11 +127,11 @@ public class PutAzureDataLakeStorage extends AbstractAzureDataLakeStorageProcess
                 fileClient.flush(length);
 
                 final Map<String, String> attributes = new HashMap<>();
-                attributes.put("azure.filesystem", fileSystem);
-                attributes.put("azure.directory", directory);
-                attributes.put("azure.filename", fileName);
-                attributes.put("azure.primaryUri", fileClient.getFileUrl());
-                attributes.put("azure.length", String.valueOf(length));
+                attributes.put(ATTR_NAME_FILESYSTEM, fileSystem);
+                attributes.put(ATTR_NAME_DIRECTORY, directory);
+                attributes.put(ATTR_NAME_FILENAME, fileName);
+                attributes.put(ATTR_NAME_PRIMARY_URI, fileClient.getFileUrl());
+                attributes.put(ATTR_NAME_LENGTH, String.valueOf(length));
                 flowFile = session.putAllAttributes(flowFile, attributes);
 
                 session.transfer(flowFile, REL_SUCCESS);
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/ADLSAttributes.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/ADLSAttributes.java
new file mode 100644
index 0000000..087cbaa
--- /dev/null
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/ADLSAttributes.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure.storage.utils;
+
+public final class ADLSAttributes {
+
+    public static final String ATTR_NAME_FILESYSTEM = "azure.filesystem";
+    public static final String ATTR_DESCRIPTION_FILESYSTEM = "The name of the Azure File System";
+
+    public static final String ATTR_NAME_DIRECTORY = "azure.directory";
+    public static final String ATTR_DESCRIPTION_DIRECTORY = "The name of the Azure Directory";
+
+    public static final String ATTR_NAME_FILENAME = "azure.filename";
+    public static final String ATTR_DESCRIPTION_FILENAME = "The name of the Azure File";
+
+    public static final String ATTR_NAME_LENGTH = "azure.length";
+    public static final String ATTR_DESCRIPTION_LENGTH = "The length of the Azure File";
+
+    public static final String ATTR_NAME_LAST_MODIFIED = "azure.lastModified";
+    public static final String ATTR_DESCRIPTION_LAST_MODIFIED = "The last modification time of the Azure File";
+
+    public static final String ATTR_NAME_ETAG = "azure.etag";
+    public static final String ATTR_DESCRIPTION_ETAG = "The ETag of the Azure File";
+
+    public static final String ATTR_NAME_FILE_PATH = "azure.filePath";
+    public static final String ATTR_DESCRIPTION_FILE_PATH = "The full path of the Azure File";
+
+    public static final String ATTR_NAME_PRIMARY_URI = "azure.primaryUri";
+    public static final String ATTR_DESCRIPTION_PRIMARY_URI = "Primary location for file content";
+
+}
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/ADLSFileInfo.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/ADLSFileInfo.java
new file mode 100644
index 0000000..7712381
--- /dev/null
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/ADLSFileInfo.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure.storage.utils;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.processor.util.list.ListableEntity;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+public class ADLSFileInfo implements Comparable<ADLSFileInfo>, Serializable, ListableEntity {
+
+    private static final RecordSchema SCHEMA;
+    private static final String FILESYSTEM = "filesystem";
+    private static final String FILE_PATH = "filePath";
+    private static final String DIRECTORY = "directory";
+    private static final String FILENAME = "filename";
+    private static final String LENGTH = "length";
+    private static final String LAST_MODIFIED = "lastModified";
+    private static final String ETAG = "etag";
+
+    static {
+        List<RecordField> recordFields = new ArrayList<>();
+        recordFields.add(new RecordField(FILESYSTEM, RecordFieldType.STRING.getDataType(), false));
+        recordFields.add(new RecordField(FILE_PATH, RecordFieldType.STRING.getDataType(), false));
+        recordFields.add(new RecordField(DIRECTORY, RecordFieldType.STRING.getDataType(), false));
+        recordFields.add(new RecordField(FILENAME, RecordFieldType.STRING.getDataType(), false));
+        recordFields.add(new RecordField(LENGTH, RecordFieldType.LONG.getDataType(), false));
+        recordFields.add(new RecordField(LAST_MODIFIED, RecordFieldType.TIMESTAMP.getDataType(), false));
+        recordFields.add(new RecordField(ETAG, RecordFieldType.STRING.getDataType()));
+        SCHEMA = new SimpleRecordSchema(recordFields);
+    }
+
+    private static final Comparator<ADLSFileInfo> COMPARATOR = Comparator.comparing(ADLSFileInfo::getFileSystem).thenComparing(ADLSFileInfo::getFilePath);
+
+    private final String fileSystem;
+    private final String filePath;
+    private final long length;
+    private final long lastModified;
+    private final String etag;
+
+    private ADLSFileInfo(Builder builder) {
+        this.fileSystem = builder.fileSystem;
+        this.filePath = builder.filePath;
+        this.length = builder.length;
+        this.lastModified = builder.lastModified;
+        this.etag = builder.etag;
+    }
+
+    public String getFileSystem() {
+        return fileSystem;
+    }
+
+    public String getFilePath() {
+        return filePath;
+    }
+
+    public long getLength() {
+        return length;
+    }
+
+    public long getLastModified() {
+        return lastModified;
+    }
+
+    public String getEtag() {
+        return etag;
+    }
+
+    public String getDirectory() {
+        return filePath.contains("/") ? StringUtils.substringBeforeLast(filePath, "/") : "";
+    }
+
+    public String getFilename() {
+        return filePath.contains("/") ? StringUtils.substringAfterLast(filePath, "/") : filePath;
+    }
+
+    @Override
+    public String getName() {
+        return getFilePath();
+    }
+
+    @Override
+    public String getIdentifier() {
+        return getFilePath();
+    }
+
+    @Override
+    public long getTimestamp() {
+        return getLastModified();
+    }
+
+    @Override
+    public long getSize() {
+        return getLength();
+    }
+
+    public static RecordSchema getRecordSchema() {
+        return SCHEMA;
+    }
+
+    @Override
+    public Record toRecord() {
+        Map<String, Object> values = new HashMap<>();
+        values.put(FILESYSTEM, getFileSystem());
+        values.put(FILE_PATH, getFilePath());
+        values.put(DIRECTORY, getDirectory());
+        values.put(FILENAME, getFilename());
+        values.put(LENGTH, getLength());
+        values.put(LAST_MODIFIED, getLastModified());
+        values.put(ETAG, getEtag());
+        return new MapRecord(SCHEMA, values);
+    }
+
+    @Override
+    public boolean equals(Object other) {
+        if (this == other) {
+            return true;
+        }
+        if (other == null || getClass() != other.getClass()) {
+            return false;
+        }
+        ADLSFileInfo otherFileInfo = (ADLSFileInfo) other;
+        return Objects.equals(fileSystem, otherFileInfo.fileSystem)
+                && Objects.equals(filePath, otherFileInfo.filePath);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(fileSystem, filePath);
+    }
+
+    @Override
+    public int compareTo(ADLSFileInfo other) {
+        return COMPARATOR.compare(this, other);
+    }
+
+    public static class Builder {
+        private String fileSystem;
+        private String filePath;
+        private long length;
+        private long lastModified;
+        private String etag;
+
+        public Builder fileSystem(String fileSystem) {
+            this.fileSystem = fileSystem;
+            return this;
+        }
+
+        public Builder filePath(String filePath) {
+            this.filePath = filePath;
+            return this;
+        }
+
+        public Builder length(long length) {
+            this.length = length;
+            return this;
+        }
+
+        public Builder lastModified(long lastModified) {
+            this.lastModified = lastModified;
+            return this;
+        }
+
+        public Builder etag(String etag) {
+            this.etag = etag;
+            return this;
+        }
+
+        public ADLSFileInfo build() {
+            return new ADLSFileInfo(this);
+        }
+    }
+}
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index 6e09330..015596e 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -23,4 +23,5 @@ org.apache.nifi.processors.azure.storage.queue.PutAzureQueueStorage
 org.apache.nifi.processors.azure.storage.queue.GetAzureQueueStorage
 org.apache.nifi.processors.azure.storage.PutAzureDataLakeStorage
 org.apache.nifi.processors.azure.storage.DeleteAzureDataLakeStorage
-org.apache.nifi.processors.azure.storage.FetchAzureDataLakeStorage
\ No newline at end of file
+org.apache.nifi.processors.azure.storage.FetchAzureDataLakeStorage
+org.apache.nifi.processors.azure.storage.ListAzureDataLakeStorage
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AbstractAzureDataLakeStorageIT.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AbstractAzureDataLakeStorageIT.java
index da03eae..553621b 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AbstractAzureDataLakeStorageIT.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AbstractAzureDataLakeStorageIT.java
@@ -22,6 +22,7 @@ import com.azure.storage.file.datalake.DataLakeFileClient;
 import com.azure.storage.file.datalake.DataLakeFileSystemClient;
 import com.azure.storage.file.datalake.DataLakeServiceClient;
 import com.azure.storage.file.datalake.DataLakeServiceClientBuilder;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor;
 import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
 import org.apache.nifi.services.azure.storage.ADLSCredentialsControllerService;
@@ -35,6 +36,7 @@ import java.util.UUID;
 public abstract class AbstractAzureDataLakeStorageIT extends AbstractAzureStorageIT {
 
     private static final String FILESYSTEM_NAME_PREFIX = "nifi-test-filesystem";
+    private static final String TEST_FILE_CONTENT = "test";
 
     protected String fileSystemName;
     protected DataLakeFileSystemClient fileSystemClient;
@@ -72,6 +74,10 @@ public abstract class AbstractAzureDataLakeStorageIT extends AbstractAzureStorag
                 .buildClient();
     }
 
+    protected void createDirectory(String directory) {
+        fileSystemClient.createDirectory(directory);
+    }
+
     protected void uploadFile(String directory, String filename, String fileContent) {
         byte[] fileContentBytes = fileContent.getBytes();
 
@@ -82,9 +88,49 @@ public abstract class AbstractAzureDataLakeStorageIT extends AbstractAzureStorag
         fileClient.flush(fileContentBytes.length);
     }
 
+    protected void uploadFile(TestFile testFile) {
+        uploadFile(testFile.getDirectory(), testFile.getFilename(), testFile.getFileContent());
+    }
+
     protected void createDirectoryAndUploadFile(String directory, String filename, String fileContent) {
-        fileSystemClient.createDirectory(directory);
+        createDirectory(directory);
 
         uploadFile(directory, filename, fileContent);
     }
+
+    protected void createDirectoryAndUploadFile(TestFile testFile) {
+        createDirectoryAndUploadFile(testFile.getDirectory(), testFile.getFilename(), testFile.getFileContent());
+    }
+
+    protected static class TestFile {
+        private final String directory;
+        private final String filename;
+        private final String fileContent;
+
+        public TestFile(String directory, String filename, String fileContent) {
+            this.directory = directory;
+            this.filename = filename;
+            this.fileContent = fileContent;
+        }
+
+        public TestFile(String directory, String filename) {
+            this(directory, filename, TEST_FILE_CONTENT);
+        }
+
+        public String getDirectory() {
+            return directory;
+        }
+
+        public String getFilename() {
+            return filename;
+        }
+
+        public String getFileContent() {
+            return fileContent;
+        }
+
+        public String getFilePath() {
+            return StringUtils.isNotBlank(directory) ? String.format("%s/%s", directory, filename) : filename;
+        }
+    }
 }
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITListAzureDataLakeStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITListAzureDataLakeStorage.java
new file mode 100644
index 0000000..fd308bc
--- /dev/null
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITListAzureDataLakeStorage.java
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure.storage;
+
+import org.apache.nifi.processor.Processor;
+import org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.serialization.record.MockRecordWriter;
+import org.apache.nifi.util.MockFlowFile;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_DIRECTORY;
+import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_ETAG;
+import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_FILENAME;
+import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_FILESYSTEM;
+import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_FILE_PATH;
+import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_LAST_MODIFIED;
+import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_LENGTH;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+
+public class ITListAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT {
+
+    private Map<String, TestFile> testFiles;
+
+    @Override
+    protected Class<? extends Processor> getProcessorClass() {
+        return ListAzureDataLakeStorage.class;
+    }
+
+    @Before
+    public void setUp() {
+        testFiles = new HashMap<>();
+
+        TestFile testFile1 = new TestFile("", "file1");
+        uploadFile(testFile1);
+        testFiles.put(testFile1.getFilePath(), testFile1);
+
+        TestFile testFile2 = new TestFile("", "file2");
+        uploadFile(testFile2);
+        testFiles.put(testFile2.getFilePath(), testFile2);
+
+        TestFile testFile11 = new TestFile("dir1", "file11");
+        createDirectoryAndUploadFile(testFile11);
+        testFiles.put(testFile11.getFilePath(), testFile11);
+
+        TestFile testFile12 = new TestFile("dir1", "file12");
+        uploadFile(testFile12);
+        testFiles.put(testFile12.getFilePath(), testFile12);
+
+        TestFile testFile111 = new TestFile("dir1/dir11", "file111");
+        createDirectoryAndUploadFile(testFile111);
+        testFiles.put(testFile111.getFilePath(), testFile111);
+
+        TestFile testFile21 = new TestFile("dir 2", "file 21");
+        createDirectoryAndUploadFile(testFile21);
+        testFiles.put(testFile21.getFilePath(), testFile21);
+
+        createDirectory("dir3");
+    }
+
+    @Test
+    public void testListRootRecursive() throws Exception {
+        runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, "");
+
+        runProcessor();
+
+        assertSuccess("file1", "file2", "dir1/file11", "dir1/file12", "dir1/dir11/file111", "dir 2/file 21");
+    }
+
+    @Test
+    public void testListRootNonRecursive() throws Exception {
+        runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, "");
+        runner.setProperty(ListAzureDataLakeStorage.RECURSE_SUBDIRECTORIES, "false");
+
+        runProcessor();
+
+        assertSuccess("file1", "file2");
+    }
+
+    @Test
+    public void testListSubdirectoryRecursive() throws Exception {
+        runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, "dir1");
+
+        runProcessor();
+
+        assertSuccess("dir1/file11", "dir1/file12", "dir1/dir11/file111");
+    }
+
+    @Test
+    public void testListSubdirectoryNonRecursive() throws Exception {
+        runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, "dir1");
+        runner.setProperty(ListAzureDataLakeStorage.RECURSE_SUBDIRECTORIES, "false");
+
+        runProcessor();
+
+        assertSuccess("dir1/file11", "dir1/file12");
+    }
+
+    @Test
+    public void testListWithFileFilter() throws Exception {
+        runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, "");
+        runner.setProperty(ListAzureDataLakeStorage.FILE_FILTER, "^file1.*$");
+
+        runProcessor();
+
+        assertSuccess("file1", "dir1/file11", "dir1/file12", "dir1/dir11/file111");
+    }
+
+    @Test
+    public void testListWithFileFilterWithEL() throws Exception {
+        runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, "");
+        runner.setProperty(ListAzureDataLakeStorage.FILE_FILTER, "^file${suffix}$");
+        runner.setVariable("suffix", "1.*");
+
+        runProcessor();
+
+        assertSuccess("file1", "dir1/file11", "dir1/file12", "dir1/dir11/file111");
+    }
+
+    @Test
+    public void testListRootWithPathFilter() throws Exception {
+        runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, "");
+        runner.setProperty(ListAzureDataLakeStorage.PATH_FILTER, "^dir1.*$");
+
+        runProcessor();
+
+        assertSuccess("dir1/file11", "dir1/file12", "dir1/dir11/file111");
+    }
+
+    @Test
+    public void testListRootWithPathFilterWithEL() throws Exception {
+        runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, "");
+        runner.setProperty(ListAzureDataLakeStorage.PATH_FILTER, "${prefix}${suffix}");
+        runner.setVariable("prefix", "^dir");
+        runner.setVariable("suffix", "1.*$");
+
+        runProcessor();
+
+        assertSuccess("dir1/file11", "dir1/file12", "dir1/dir11/file111");
+    }
+
+    @Test
+    public void testListSubdirectoryWithPathFilter() throws Exception {
+        runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, "dir1");
+        runner.setProperty(ListAzureDataLakeStorage.PATH_FILTER, "dir1.*");
+
+        runProcessor();
+
+        assertSuccess("dir1/dir11/file111");
+    }
+
+    @Test
+    public void testListRootWithFileAndPathFilter() throws Exception {
+        runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, "");
+        runner.setProperty(ListAzureDataLakeStorage.FILE_FILTER, ".*11");
+        runner.setProperty(ListAzureDataLakeStorage.PATH_FILTER, "dir1.*");
+
+        runProcessor();
+
+        assertSuccess("dir1/file11", "dir1/dir11/file111");
+    }
+
+    @Test
+    public void testListEmptyDirectory() throws Exception {
+        runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, "dir3");
+
+        runProcessor();
+
+        assertSuccess();
+    }
+
+    @Test
+    public void testListNonExistingDirectory() {
+        runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, "dummy");
+
+        runProcessor();
+
+        assertFailure();
+    }
+
+    @Test
+    public void testListWithNonExistingFileSystem() {
+        runner.setProperty(AbstractAzureDataLakeStorageProcessor.FILESYSTEM, "dummy");
+        runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, "");
+
+        runProcessor();
+
+        assertFailure();
+    }
+
+    @Test
+    public void testListWithRecords() throws InitializationException {
+        runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, "dir1");
+
+        MockRecordWriter recordWriter = new MockRecordWriter(null, false);
+        runner.addControllerService("record-writer", recordWriter);
+        runner.enableControllerService(recordWriter);
+        runner.setProperty(ListAzureDataLakeStorage.RECORD_WRITER, "record-writer");
+
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(ListAzureDataLakeStorage.REL_SUCCESS, 1);
+        MockFlowFile flowFile = runner.getFlowFilesForRelationship(ListAzureDataLakeStorage.REL_SUCCESS).get(0);
+        flowFile.assertAttributeEquals("record.count", "3");
+    }
+
+    private void runProcessor() {
+        runner.assertValid();
+        runner.run();
+    }
+
+    private void assertSuccess(String... testFilePaths) throws Exception {
+        runner.assertTransferCount(ListAzureDataLakeStorage.REL_SUCCESS, testFilePaths.length);
+
+        Map<String, TestFile> expectedFiles = new HashMap<>(testFiles);
+        expectedFiles.keySet().retainAll(Arrays.asList(testFilePaths));
+
+        List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ListAzureDataLakeStorage.REL_SUCCESS);
+
+        for (MockFlowFile flowFile : flowFiles) {
+            String filePath = flowFile.getAttribute("azure.filePath");
+            TestFile testFile = expectedFiles.remove(filePath);
+            assertNotNull("File path not found in the expected map", testFile);
+            assertFlowFile(testFile, flowFile);
+        }
+    }
+
+    private void assertFlowFile(TestFile testFile, MockFlowFile flowFile) throws Exception {
+        flowFile.assertAttributeEquals(ATTR_NAME_FILESYSTEM, fileSystemName);
+        flowFile.assertAttributeEquals(ATTR_NAME_FILE_PATH, testFile.getFilePath());
+        flowFile.assertAttributeEquals(ATTR_NAME_DIRECTORY, testFile.getDirectory());
+        flowFile.assertAttributeEquals(ATTR_NAME_FILENAME, testFile.getFilename());
+        flowFile.assertAttributeEquals(ATTR_NAME_LENGTH, String.valueOf(testFile.getFileContent().length()));
+
+        flowFile.assertAttributeExists(ATTR_NAME_LAST_MODIFIED);
+        flowFile.assertAttributeExists(ATTR_NAME_ETAG);
+
+        flowFile.assertContentEquals("");
+    }
+
+    private void assertFailure() {
+        assertFalse(runner.getLogger().getErrorMessages().isEmpty());
+        runner.assertTransferCount(ListAzureDataLakeStorage.REL_SUCCESS, 0);
+    }
+
+}
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureDataLakeStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureDataLakeStorage.java
index 049031e..324e20d 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureDataLakeStorage.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureDataLakeStorage.java
@@ -35,6 +35,11 @@ import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
 
+import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_DIRECTORY;
+import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_FILENAME;
+import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_FILESYSTEM;
+import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_LENGTH;
+import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_PRIMARY_URI;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
@@ -149,15 +154,6 @@ public class ITPutAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT {
     }
 
     @Test
-    public void testPutFileWithInvalidDirectory() {
-        runner.setProperty(PutAzureDataLakeStorage.DIRECTORY, "/dir1");
-
-        runProcessor(FILE_DATA);
-
-        assertFailure();
-    }
-
-    @Test
     public void testPutFileWithInvalidFileName() {
         runner.setProperty(PutAzureDataLakeStorage.FILE, "/file1");
 
@@ -282,18 +278,18 @@ public class ITPutAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT {
     private void assertFlowFile(byte[] fileData, String fileName, String directory) throws Exception {
         MockFlowFile flowFile = assertFlowFile(fileData);
 
-        flowFile.assertAttributeEquals("azure.filesystem", fileSystemName);
-        flowFile.assertAttributeEquals("azure.directory", directory);
-        flowFile.assertAttributeEquals("azure.filename", fileName);
+        flowFile.assertAttributeEquals(ATTR_NAME_FILESYSTEM, fileSystemName);
+        flowFile.assertAttributeEquals(ATTR_NAME_DIRECTORY, directory);
+        flowFile.assertAttributeEquals(ATTR_NAME_FILENAME, fileName);
 
         String urlEscapedDirectory = UrlEscapers.urlPathSegmentEscaper().escape(directory);
         String urlEscapedFileName = UrlEscapers.urlPathSegmentEscaper().escape(fileName);
         String primaryUri = StringUtils.isNotEmpty(directory)
                 ? String.format("https://%s.dfs.core.windows.net/%s/%s/%s", getAccountName(), fileSystemName, urlEscapedDirectory, urlEscapedFileName)
                 : String.format("https://%s.dfs.core.windows.net/%s/%s", getAccountName(), fileSystemName, urlEscapedFileName);
-        flowFile.assertAttributeEquals("azure.primaryUri", primaryUri);
+        flowFile.assertAttributeEquals(ATTR_NAME_PRIMARY_URI, primaryUri);
 
-        flowFile.assertAttributeEquals("azure.length", Integer.toString(fileData.length));
+        flowFile.assertAttributeEquals(ATTR_NAME_LENGTH, Integer.toString(fileData.length));
     }
 
     private MockFlowFile assertFlowFile(byte[] fileData) throws Exception {
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/TestAbstractAzureDataLakeStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/TestAbstractAzureDataLakeStorage.java
index 94fc576..a4a4c64 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/TestAbstractAzureDataLakeStorage.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/TestAbstractAzureDataLakeStorage.java
@@ -47,7 +47,11 @@ public class TestAbstractAzureDataLakeStorage {
         runner.setProperty(DIRECTORY, "directory");
         runner.setProperty(FILE, "file");
         runner.setProperty(ADLS_CREDENTIALS_SERVICE, "credentials_service");
+    }
 
+    @Test
+    public void testValid() {
+        runner.assertValid();
     }
 
     @Test
@@ -80,6 +84,27 @@ public class TestAbstractAzureDataLakeStorage {
     }
 
     @Test
+    public void testNotValidWhenDirectoryIsSlash() {
+        runner.setProperty(DIRECTORY, "/");
+
+        runner.assertNotValid();
+    }
+
+    @Test
+    public void testNotValidWhenDirectoryStartsWithSlash() {
+        runner.setProperty(DIRECTORY, "/directory");
+
+        runner.assertNotValid();
+    }
+
+    @Test
+    public void testNotValidWhenDirectoryIsWhitespaceOnly() {
+        runner.setProperty(DIRECTORY, "   ");
+
+        runner.assertNotValid();
+    }
+
+    @Test
     public void testValidWhenNoFileSpecified() {
         // the default value will be used
         runner.removeProperty(FILE);