You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by pv...@apache.org on 2020/08/03 17:27:35 UTC
[nifi] branch main updated: NIFI-7340: Adding
ListAzureDataLakeStorage Also added validator for Directory Name property
in AbstractAzureDataLakeStorageProcessor Fix Tracking Entities strategy:
use milliseconds for lastModified
This is an automated email from the ASF dual-hosted git repository.
pvillard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new c980b64 NIFI-7340: Adding ListAzureDataLakeStorage Also added validator for Directory Name property in AbstractAzureDataLakeStorageProcessor Fix Tracking Entities strategy: use milliseconds for lastModified
c980b64 is described below
commit c980b64bf56f6da39f385ebbc71aa8fc3cd95936
Author: Peter Turcsanyi <tu...@apache.org>
AuthorDate: Wed Jul 22 00:28:54 2020 +0200
NIFI-7340: Adding ListAzureDataLakeStorage
Also added validator for Directory Name property in AbstractAzureDataLakeStorageProcessor
Fix Tracking Entities strategy: use milliseconds for lastModified
Signed-off-by: Pierre Villard <pi...@gmail.com>
This closes #4438.
---
.../nifi/processor/util/StandardValidators.java | 2 +
.../AbstractAzureDataLakeStorageProcessor.java | 70 +++++-
.../azure/storage/DeleteAzureDataLakeStorage.java | 18 +-
.../azure/storage/FetchAzureDataLakeStorage.java | 18 +-
.../azure/storage/ListAzureDataLakeStorage.java | 264 ++++++++++++++++++++
.../azure/storage/PutAzureDataLakeStorage.java | 49 ++--
.../azure/storage/utils/ADLSAttributes.java | 45 ++++
.../azure/storage/utils/ADLSFileInfo.java | 199 +++++++++++++++
.../services/org.apache.nifi.processor.Processor | 3 +-
.../storage/AbstractAzureDataLakeStorageIT.java | 48 +++-
.../azure/storage/ITListAzureDataLakeStorage.java | 267 +++++++++++++++++++++
.../azure/storage/ITPutAzureDataLakeStorage.java | 24 +-
.../storage/TestAbstractAzureDataLakeStorage.java | 25 ++
13 files changed, 956 insertions(+), 76 deletions(-)
diff --git a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/processor/util/StandardValidators.java b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/processor/util/StandardValidators.java
index 11962bc..5ada3dd 100644
--- a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/processor/util/StandardValidators.java
+++ b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/processor/util/StandardValidators.java
@@ -414,6 +414,8 @@ public class StandardValidators {
public static final Validator REGULAR_EXPRESSION_VALIDATOR = createRegexValidator(0, Integer.MAX_VALUE, false);
+ public static final Validator REGULAR_EXPRESSION_WITH_EL_VALIDATOR = createRegexValidator(0, Integer.MAX_VALUE, true);
+
public static final Validator ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR = new Validator() {
@Override
public ValidationResult validate(final String subject, final String input, final ValidationContext context) {
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureDataLakeStorageProcessor.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureDataLakeStorageProcessor.java
index fd0be04..bb22c57 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureDataLakeStorageProcessor.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureDataLakeStorageProcessor.java
@@ -33,17 +33,23 @@ import com.azure.storage.file.datalake.DataLakeServiceClientBuilder;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
import org.apache.nifi.context.PropertyContext;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.services.azure.storage.ADLSCredentialsDetails;
import org.apache.nifi.services.azure.storage.ADLSCredentialsService;
import reactor.core.publisher.Mono;
+import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_FILENAME;
+
public abstract class AbstractAzureDataLakeStorageProcessor extends AbstractProcessor {
public static final PropertyDescriptor ADLS_CREDENTIALS_SERVICE = new PropertyDescriptor.Builder()
@@ -57,15 +63,16 @@ public abstract class AbstractAzureDataLakeStorageProcessor extends AbstractProc
public static final PropertyDescriptor FILESYSTEM = new PropertyDescriptor.Builder()
.name("filesystem-name").displayName("Filesystem Name")
.description("Name of the Azure Storage File System. It is assumed to be already existing.")
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.required(true)
.build();
public static final PropertyDescriptor DIRECTORY = new PropertyDescriptor.Builder()
.name("directory-name").displayName("Directory Name")
- .description("Name of the Azure Storage Directory. In case of the PutAzureDatalakeStorage processor, it will be created if not already existing.")
- .addValidator(Validator.VALID)
+ .description("Name of the Azure Storage Directory. The Directory Name cannot contain a leading '/'. The root directory can be designated by the empty string value. " +
+ "In case of the PutAzureDataLakeStorage processor, the directory will be created if not already existing.")
+ .addValidator(new DirectoryValidator())
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.required(true)
.build();
@@ -73,10 +80,10 @@ public abstract class AbstractAzureDataLakeStorageProcessor extends AbstractProc
public static final PropertyDescriptor FILE = new PropertyDescriptor.Builder()
.name("file-name").displayName("File Name")
.description("The filename")
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.required(true)
- .defaultValue("${azure.filename}")
+ .defaultValue(String.format("${%s}", ATTR_NAME_FILENAME))
.build();
public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description(
@@ -103,6 +110,11 @@ public abstract class AbstractAzureDataLakeStorageProcessor extends AbstractProc
return PROPERTIES;
}
+ @Override
+ public Set<Relationship> getRelationships() {
+ return RELATIONSHIPS;
+ }
+
public static DataLakeServiceClient getStorageClient(PropertyContext context, FlowFile flowFile) {
final Map<String, String> attributes = flowFile != null ? flowFile.getAttributes() : Collections.emptyMap();
@@ -146,8 +158,50 @@ public abstract class AbstractAzureDataLakeStorageProcessor extends AbstractProc
return storageClient;
}
- @Override
- public Set<Relationship> getRelationships() {
- return RELATIONSHIPS;
+ public static String evaluateFileSystemProperty(ProcessContext context, FlowFile flowFile) {
+ String fileSystem = context.getProperty(FILESYSTEM).evaluateAttributeExpressions(flowFile).getValue();
+ if (StringUtils.isBlank(fileSystem)) {
+ throw new ProcessException(String.format("'%1$s' property evaluated to blank string. '%s' must be specified as a non-blank string.", FILESYSTEM.getDisplayName()));
+ }
+ return fileSystem;
+ }
+
+ public static String evaluateDirectoryProperty(ProcessContext context, FlowFile flowFile) {
+ String directory = context.getProperty(DIRECTORY).evaluateAttributeExpressions(flowFile).getValue();
+ if (directory.startsWith("/")) {
+ throw new ProcessException(String.format("'%1$s' starts with '/'. '%s' cannot contain a leading '/'.", DIRECTORY.getDisplayName()));
+ } else if (StringUtils.isNotEmpty(directory) && StringUtils.isWhitespace(directory)) {
+ throw new ProcessException(String.format("'%1$s' contains whitespace characters only.", DIRECTORY.getDisplayName()));
+ }
+ return directory;
+ }
+
+ public static String evaluateFileNameProperty(ProcessContext context, FlowFile flowFile) {
+ String fileName = context.getProperty(FILE).evaluateAttributeExpressions(flowFile).getValue();
+ if (StringUtils.isBlank(fileName)) {
+ throw new ProcessException(String.format("'%1$s' property evaluated to blank string. '%s' must be specified as a non-blank string.", FILE.getDisplayName()));
+ }
+ return fileName;
+ }
+
+ private static class DirectoryValidator implements Validator {
+ @Override
+ public ValidationResult validate(String subject, String input, ValidationContext context) {
+ ValidationResult.Builder builder = new ValidationResult.Builder()
+ .subject(DIRECTORY.getDisplayName())
+ .input(input);
+
+ if (context.isExpressionLanguagePresent(input)) {
+ builder.valid(true).explanation("Expression Language Present");
+ } else if (input.startsWith("/")) {
+ builder.valid(false).explanation(String.format("'%s' cannot contain a leading '/'", DIRECTORY.getDisplayName()));
+ } else if (StringUtils.isNotEmpty(input) && StringUtils.isWhitespace(input)) {
+ builder.valid(false).explanation(String.format("'%s' cannot contain whitespace characters only", DIRECTORY.getDisplayName()));
+ } else {
+ builder.valid(true);
+ }
+
+ return builder.build();
+ }
}
}
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/DeleteAzureDataLakeStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/DeleteAzureDataLakeStorage.java
index 314785a..0e5128e 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/DeleteAzureDataLakeStorage.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/DeleteAzureDataLakeStorage.java
@@ -20,7 +20,6 @@ import com.azure.storage.file.datalake.DataLakeDirectoryClient;
import com.azure.storage.file.datalake.DataLakeFileClient;
import com.azure.storage.file.datalake.DataLakeFileSystemClient;
import com.azure.storage.file.datalake.DataLakeServiceClient;
-import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
@@ -35,7 +34,7 @@ import org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor;
import java.util.concurrent.TimeUnit;
@Tags({"azure", "microsoft", "cloud", "storage", "adlsgen2", "datalake"})
-@SeeAlso({PutAzureDataLakeStorage.class, FetchAzureDataLakeStorage.class})
+@SeeAlso({PutAzureDataLakeStorage.class, FetchAzureDataLakeStorage.class, ListAzureDataLakeStorage.class})
@CapabilityDescription("Deletes the provided file from Azure Data Lake Storage")
@InputRequirement(Requirement.INPUT_REQUIRED)
public class DeleteAzureDataLakeStorage extends AbstractAzureDataLakeStorageProcessor {
@@ -49,18 +48,9 @@ public class DeleteAzureDataLakeStorage extends AbstractAzureDataLakeStorageProc
final long startNanos = System.nanoTime();
try {
- final String fileSystem = context.getProperty(FILESYSTEM).evaluateAttributeExpressions(flowFile).getValue();
- final String directory = context.getProperty(DIRECTORY).evaluateAttributeExpressions(flowFile).getValue();
- final String fileName = context.getProperty(FILE).evaluateAttributeExpressions(flowFile).getValue();
-
- if (StringUtils.isBlank(fileSystem)) {
- throw new ProcessException(FILESYSTEM.getDisplayName() + " property evaluated to empty string. " +
- FILESYSTEM.getDisplayName() + " must be specified as a non-empty string.");
- }
- if (StringUtils.isBlank(fileName)) {
- throw new ProcessException(FILE.getDisplayName() + " property evaluated to empty string. " +
- FILE.getDisplayName() + " must be specified as a non-empty string.");
- }
+ final String fileSystem = evaluateFileSystemProperty(context, flowFile);
+ final String directory = evaluateDirectoryProperty(context, flowFile);
+ final String fileName = evaluateFileNameProperty(context, flowFile);
final DataLakeServiceClient storageClient = getStorageClient(context, flowFile);
final DataLakeFileSystemClient fileSystemClient = storageClient.getFileSystemClient(fileSystem);
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureDataLakeStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureDataLakeStorage.java
index 5af7988..a4febcb 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureDataLakeStorage.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureDataLakeStorage.java
@@ -20,7 +20,6 @@ import com.azure.storage.file.datalake.DataLakeDirectoryClient;
import com.azure.storage.file.datalake.DataLakeFileClient;
import com.azure.storage.file.datalake.DataLakeFileSystemClient;
import com.azure.storage.file.datalake.DataLakeServiceClient;
-import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
@@ -35,7 +34,7 @@ import org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor;
import java.util.concurrent.TimeUnit;
@Tags({"azure", "microsoft", "cloud", "storage", "adlsgen2", "datalake"})
-@SeeAlso({PutAzureDataLakeStorage.class, DeleteAzureDataLakeStorage.class})
+@SeeAlso({PutAzureDataLakeStorage.class, DeleteAzureDataLakeStorage.class, ListAzureDataLakeStorage.class})
@CapabilityDescription("Fetch the provided file from Azure Data Lake Storage")
@InputRequirement(Requirement.INPUT_REQUIRED)
public class FetchAzureDataLakeStorage extends AbstractAzureDataLakeStorageProcessor {
@@ -49,18 +48,9 @@ public class FetchAzureDataLakeStorage extends AbstractAzureDataLakeStorageProce
final long startNanos = System.nanoTime();
try {
- final String fileSystem = context.getProperty(FILESYSTEM).evaluateAttributeExpressions(flowFile).getValue();
- final String directory = context.getProperty(DIRECTORY).evaluateAttributeExpressions(flowFile).getValue();
- final String fileName = context.getProperty(FILE).evaluateAttributeExpressions(flowFile).getValue();
-
- if (StringUtils.isBlank(fileSystem)) {
- throw new ProcessException(FILESYSTEM.getDisplayName() + " property evaluated to empty string. " +
- FILESYSTEM.getDisplayName() + " must be specified as a non-empty string.");
- }
- if (StringUtils.isBlank(fileName)) {
- throw new ProcessException(FILE.getDisplayName() + " property evaluated to empty string. " +
- FILE.getDisplayName() + " must be specified as a non-empty string.");
- }
+ final String fileSystem = evaluateFileSystemProperty(context, flowFile);
+ final String directory = evaluateDirectoryProperty(context, flowFile);
+ final String fileName = evaluateFileNameProperty(context, flowFile);
final DataLakeServiceClient storageClient = getStorageClient(context, flowFile);
final DataLakeFileSystemClient fileSystemClient = storageClient.getFileSystemClient(fileSystem);
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureDataLakeStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureDataLakeStorage.java
new file mode 100644
index 0000000..c726e81
--- /dev/null
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureDataLakeStorage.java
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure.storage;
+
+import com.azure.storage.file.datalake.DataLakeFileSystemClient;
+import com.azure.storage.file.datalake.DataLakeServiceClient;
+import com.azure.storage.file.datalake.models.ListPathsOptions;
+import org.apache.commons.lang3.RegExUtils;
+import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.PrimaryNodeOnly;
+import org.apache.nifi.annotation.behavior.Stateful;
+import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.context.PropertyContext;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processor.util.list.AbstractListProcessor;
+import org.apache.nifi.processors.azure.storage.utils.ADLSFileInfo;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+import static org.apache.nifi.processor.util.list.ListedEntityTracker.INITIAL_LISTING_TARGET;
+import static org.apache.nifi.processor.util.list.ListedEntityTracker.TRACKING_STATE_CACHE;
+import static org.apache.nifi.processor.util.list.ListedEntityTracker.TRACKING_TIME_WINDOW;
+import static org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor.ADLS_CREDENTIALS_SERVICE;
+import static org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor.DIRECTORY;
+import static org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor.FILESYSTEM;
+import static org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor.evaluateDirectoryProperty;
+import static org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor.evaluateFileSystemProperty;
+import static org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor.getStorageClient;
+import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_DESCRIPTION_DIRECTORY;
+import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_DESCRIPTION_ETAG;
+import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_DESCRIPTION_FILENAME;
+import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_DESCRIPTION_FILESYSTEM;
+import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_DESCRIPTION_FILE_PATH;
+import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_DESCRIPTION_LAST_MODIFIED;
+import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_DESCRIPTION_LENGTH;
+import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_DIRECTORY;
+import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_ETAG;
+import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_FILENAME;
+import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_FILESYSTEM;
+import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_FILE_PATH;
+import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_LAST_MODIFIED;
+import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_LENGTH;
+
+@PrimaryNodeOnly
+@TriggerSerially
+@Tags({"azure", "microsoft", "cloud", "storage", "adlsgen2", "datalake"})
+@SeeAlso({PutAzureDataLakeStorage.class, DeleteAzureDataLakeStorage.class, FetchAzureDataLakeStorage.class})
+@CapabilityDescription("Lists directory in an Azure Data Lake Storage Gen 2 filesystem")
+@WritesAttributes({
+ @WritesAttribute(attribute = ATTR_NAME_FILESYSTEM, description = ATTR_DESCRIPTION_FILESYSTEM),
+ @WritesAttribute(attribute = ATTR_NAME_FILE_PATH, description = ATTR_DESCRIPTION_FILE_PATH),
+ @WritesAttribute(attribute = ATTR_NAME_DIRECTORY, description = ATTR_DESCRIPTION_DIRECTORY),
+ @WritesAttribute(attribute = ATTR_NAME_FILENAME, description = ATTR_DESCRIPTION_FILENAME),
+ @WritesAttribute(attribute = ATTR_NAME_LENGTH, description = ATTR_DESCRIPTION_LENGTH),
+ @WritesAttribute(attribute = ATTR_NAME_LAST_MODIFIED, description = ATTR_DESCRIPTION_LAST_MODIFIED),
+ @WritesAttribute(attribute = ATTR_NAME_ETAG, description = ATTR_DESCRIPTION_ETAG)
+})
+@InputRequirement(Requirement.INPUT_FORBIDDEN)
+@Stateful(scopes = {Scope.CLUSTER}, description = "After performing a listing of files, the timestamp of the newest file is stored. " +
+ "This allows the Processor to list only files that have been added or modified after this date the next time that the Processor is run. State is " +
+ "stored across the cluster so that this Processor can be run on Primary Node only and if a new Primary Node is selected, the new node can pick up " +
+ "where the previous node left off, without duplicating the data.")
+public class ListAzureDataLakeStorage extends AbstractListProcessor<ADLSFileInfo> {
+
+ public static final PropertyDescriptor RECURSE_SUBDIRECTORIES = new PropertyDescriptor.Builder()
+ .name("recurse-subdirectories")
+ .displayName("Recurse Subdirectories")
+ .description("Indicates whether to list files from subdirectories of the directory")
+ .required(true)
+ .allowableValues("true", "false")
+ .defaultValue("true")
+ .build();
+
+ public static final PropertyDescriptor FILE_FILTER = new PropertyDescriptor.Builder()
+ .name("file-filter")
+ .displayName("File Filter")
+ .description("Only files whose names match the given regular expression will be listed")
+ .required(false)
+ .addValidator(StandardValidators.REGULAR_EXPRESSION_WITH_EL_VALIDATOR)
+ .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .build();
+
+ public static final PropertyDescriptor PATH_FILTER = new PropertyDescriptor.Builder()
+ .name("path-filter")
+ .displayName("Path Filter")
+ .description(String.format("When '%s' is true, then only subdirectories whose paths match the given regular expression will be scanned", RECURSE_SUBDIRECTORIES.getDisplayName()))
+ .required(false)
+ .addValidator(StandardValidators.REGULAR_EXPRESSION_WITH_EL_VALIDATOR)
+ .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .build();
+
+ private static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList(
+ ADLS_CREDENTIALS_SERVICE,
+ FILESYSTEM,
+ DIRECTORY,
+ RECURSE_SUBDIRECTORIES,
+ FILE_FILTER,
+ PATH_FILTER,
+ RECORD_WRITER,
+ LISTING_STRATEGY,
+ TRACKING_STATE_CACHE,
+ TRACKING_TIME_WINDOW,
+ INITIAL_LISTING_TARGET));
+
+ private static final Set<PropertyDescriptor> LISTING_RESET_PROPERTIES = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
+ ADLS_CREDENTIALS_SERVICE,
+ FILESYSTEM,
+ DIRECTORY,
+ RECURSE_SUBDIRECTORIES,
+ FILE_FILTER,
+ PATH_FILTER,
+ LISTING_STRATEGY)));
+
+ private volatile Pattern filePattern;
+ private volatile Pattern pathPattern;
+
+ @Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return PROPERTIES;
+ }
+
+ @OnScheduled
+ public void onScheduled(ProcessContext context) {
+ String fileFilter = context.getProperty(FILE_FILTER).evaluateAttributeExpressions().getValue();
+ filePattern = fileFilter != null ? Pattern.compile(fileFilter) : null;
+
+ String pathFilter = context.getProperty(PATH_FILTER).evaluateAttributeExpressions().getValue();
+ pathPattern = pathFilter != null ? Pattern.compile(pathFilter) : null;
+ }
+
+ @OnStopped
+ public void onStopped() {
+ filePattern = null;
+ pathPattern = null;
+ }
+
+ @Override
+ protected void customValidate(ValidationContext context, Collection<ValidationResult> results) {
+ if (context.getProperty(PATH_FILTER).isSet() && !context.getProperty(RECURSE_SUBDIRECTORIES).asBoolean()) {
+ results.add(new ValidationResult.Builder()
+ .subject(PATH_FILTER.getDisplayName())
+ .valid(false)
+ .explanation(String.format("'%s' cannot be set when '%s' is false", PATH_FILTER.getDisplayName(), RECURSE_SUBDIRECTORIES.getDisplayName()))
+ .build());
+ }
+ }
+
+ @Override
+ protected RecordSchema getRecordSchema() {
+ return ADLSFileInfo.getRecordSchema();
+ }
+
+ @Override
+ protected Scope getStateScope(PropertyContext context) {
+ return Scope.CLUSTER;
+ }
+
+ @Override
+ protected String getDefaultTimePrecision() {
+ return PRECISION_MILLIS.getValue();
+ }
+
+ @Override
+ protected boolean isListingResetNecessary(PropertyDescriptor property) {
+ return LISTING_RESET_PROPERTIES.contains(property);
+ }
+
+ @Override
+ protected String getPath(ProcessContext context) {
+ String directory = context.getProperty(DIRECTORY).evaluateAttributeExpressions().getValue();
+ return directory != null ? directory : ".";
+ }
+
+ @Override
+ protected List<ADLSFileInfo> performListing(ProcessContext context, Long minTimestamp) throws IOException {
+ try {
+ String fileSystem = evaluateFileSystemProperty(context, null);
+ String baseDirectory = evaluateDirectoryProperty(context, null);
+ boolean recurseSubdirectories = context.getProperty(RECURSE_SUBDIRECTORIES).asBoolean();
+
+ DataLakeServiceClient storageClient = getStorageClient(context, null);
+ DataLakeFileSystemClient fileSystemClient = storageClient.getFileSystemClient(fileSystem);
+
+ ListPathsOptions options = new ListPathsOptions();
+ options.setPath(baseDirectory);
+ options.setRecursive(recurseSubdirectories);
+
+ Pattern baseDirectoryPattern = Pattern.compile("^" + baseDirectory + "/?");
+
+ List<ADLSFileInfo> listing = fileSystemClient.listPaths(options, null).stream()
+ .filter(pathItem -> !pathItem.isDirectory())
+ .map(pathItem -> new ADLSFileInfo.Builder()
+ .fileSystem(fileSystem)
+ .filePath(pathItem.getName())
+ .length(pathItem.getContentLength())
+ .lastModified(pathItem.getLastModified().toInstant().toEpochMilli())
+ .etag(pathItem.getETag())
+ .build())
+ .filter(fileInfo -> filePattern == null || filePattern.matcher(fileInfo.getFilename()).matches())
+ .filter(fileInfo -> pathPattern == null || pathPattern.matcher(RegExUtils.removeFirst(fileInfo.getDirectory(), baseDirectoryPattern)).matches())
+ .collect(Collectors.toList());
+
+ return listing;
+ } catch (Exception e) {
+ getLogger().error("Failed to list directory on Azure Data Lake Storage", e);
+ throw new IOException(ExceptionUtils.getRootCause(e));
+ }
+ }
+
+ @Override
+ protected Map<String, String> createAttributes(ADLSFileInfo fileInfo, ProcessContext context) {
+ Map<String, String> attributes = new HashMap<>();
+
+ attributes.put(ATTR_NAME_FILESYSTEM, fileInfo.getFileSystem());
+ attributes.put(ATTR_NAME_FILE_PATH, fileInfo.getFilePath());
+ attributes.put(ATTR_NAME_DIRECTORY, fileInfo.getDirectory());
+ attributes.put(ATTR_NAME_FILENAME, fileInfo.getFilename());
+ attributes.put(ATTR_NAME_LENGTH, String.valueOf(fileInfo.getLength()));
+ attributes.put(ATTR_NAME_LAST_MODIFIED, String.valueOf(fileInfo.getLastModified()));
+ attributes.put(ATTR_NAME_ETAG, fileInfo.getEtag());
+
+ return attributes;
+ }
+}
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureDataLakeStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureDataLakeStorage.java
index 081372f..ae67e3e 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureDataLakeStorage.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureDataLakeStorage.java
@@ -21,7 +21,6 @@ import com.azure.storage.file.datalake.DataLakeFileClient;
import com.azure.storage.file.datalake.DataLakeFileSystemClient;
import com.azure.storage.file.datalake.DataLakeServiceClient;
import com.azure.storage.file.datalake.models.DataLakeStorageException;
-import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.WritesAttribute;
@@ -46,14 +45,25 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
+import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_DESCRIPTION_DIRECTORY;
+import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_DESCRIPTION_FILENAME;
+import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_DESCRIPTION_FILESYSTEM;
+import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_DESCRIPTION_LENGTH;
+import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_DESCRIPTION_PRIMARY_URI;
+import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_DIRECTORY;
+import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_FILENAME;
+import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_FILESYSTEM;
+import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_LENGTH;
+import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_PRIMARY_URI;
+
@Tags({"azure", "microsoft", "cloud", "storage", "adlsgen2", "datalake"})
-@SeeAlso({DeleteAzureDataLakeStorage.class, FetchAzureDataLakeStorage.class})
+@SeeAlso({DeleteAzureDataLakeStorage.class, FetchAzureDataLakeStorage.class, ListAzureDataLakeStorage.class})
@CapabilityDescription("Puts content into an Azure Data Lake Storage Gen 2")
-@WritesAttributes({@WritesAttribute(attribute = "azure.filesystem", description = "The name of the Azure File System"),
- @WritesAttribute(attribute = "azure.directory", description = "The name of the Azure Directory"),
- @WritesAttribute(attribute = "azure.filename", description = "The name of the Azure File Name"),
- @WritesAttribute(attribute = "azure.primaryUri", description = "Primary location for file content"),
- @WritesAttribute(attribute = "azure.length", description = "Length of the file")})
+@WritesAttributes({@WritesAttribute(attribute = ATTR_NAME_FILESYSTEM, description = ATTR_DESCRIPTION_FILESYSTEM),
+ @WritesAttribute(attribute = ATTR_NAME_DIRECTORY, description = ATTR_DESCRIPTION_DIRECTORY),
+ @WritesAttribute(attribute = ATTR_NAME_FILENAME, description = ATTR_DESCRIPTION_FILENAME),
+ @WritesAttribute(attribute = ATTR_NAME_PRIMARY_URI, description = ATTR_DESCRIPTION_PRIMARY_URI),
+ @WritesAttribute(attribute = ATTR_NAME_LENGTH, description = ATTR_DESCRIPTION_LENGTH)})
@InputRequirement(Requirement.INPUT_REQUIRED)
public class PutAzureDataLakeStorage extends AbstractAzureDataLakeStorageProcessor {
@@ -93,18 +103,9 @@ public class PutAzureDataLakeStorage extends AbstractAzureDataLakeStorageProcess
final long startNanos = System.nanoTime();
try {
- final String fileSystem = context.getProperty(FILESYSTEM).evaluateAttributeExpressions(flowFile).getValue();
- final String directory = context.getProperty(DIRECTORY).evaluateAttributeExpressions(flowFile).getValue();
- final String fileName = context.getProperty(FILE).evaluateAttributeExpressions(flowFile).getValue();
-
- if (StringUtils.isBlank(fileSystem)) {
- throw new ProcessException(FILESYSTEM.getDisplayName() + " property evaluated to empty string. " +
- FILESYSTEM.getDisplayName() + " must be specified as a non-empty string.");
- }
- if (StringUtils.isBlank(fileName)) {
- throw new ProcessException(FILE.getDisplayName() + " property evaluated to empty string. " +
- FILE.getDisplayName() + " must be specified as a non-empty string.");
- }
+ final String fileSystem = evaluateFileSystemProperty(context, flowFile);
+ final String directory = evaluateDirectoryProperty(context, flowFile);
+ final String fileName = evaluateFileNameProperty(context, flowFile);
final DataLakeServiceClient storageClient = getStorageClient(context, flowFile);
final DataLakeFileSystemClient fileSystemClient = storageClient.getFileSystemClient(fileSystem);
@@ -126,11 +127,11 @@ public class PutAzureDataLakeStorage extends AbstractAzureDataLakeStorageProcess
fileClient.flush(length);
final Map<String, String> attributes = new HashMap<>();
- attributes.put("azure.filesystem", fileSystem);
- attributes.put("azure.directory", directory);
- attributes.put("azure.filename", fileName);
- attributes.put("azure.primaryUri", fileClient.getFileUrl());
- attributes.put("azure.length", String.valueOf(length));
+ attributes.put(ATTR_NAME_FILESYSTEM, fileSystem);
+ attributes.put(ATTR_NAME_DIRECTORY, directory);
+ attributes.put(ATTR_NAME_FILENAME, fileName);
+ attributes.put(ATTR_NAME_PRIMARY_URI, fileClient.getFileUrl());
+ attributes.put(ATTR_NAME_LENGTH, String.valueOf(length));
flowFile = session.putAllAttributes(flowFile, attributes);
session.transfer(flowFile, REL_SUCCESS);
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/ADLSAttributes.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/ADLSAttributes.java
new file mode 100644
index 0000000..087cbaa
--- /dev/null
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/ADLSAttributes.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure.storage.utils;
+
+public final class ADLSAttributes {
+
+ public static final String ATTR_NAME_FILESYSTEM = "azure.filesystem";
+ public static final String ATTR_DESCRIPTION_FILESYSTEM = "The name of the Azure File System";
+
+ public static final String ATTR_NAME_DIRECTORY = "azure.directory";
+ public static final String ATTR_DESCRIPTION_DIRECTORY = "The name of the Azure Directory";
+
+ public static final String ATTR_NAME_FILENAME = "azure.filename";
+ public static final String ATTR_DESCRIPTION_FILENAME = "The name of the Azure File";
+
+ public static final String ATTR_NAME_LENGTH = "azure.length";
+ public static final String ATTR_DESCRIPTION_LENGTH = "The length of the Azure File";
+
+ public static final String ATTR_NAME_LAST_MODIFIED = "azure.lastModified";
+ public static final String ATTR_DESCRIPTION_LAST_MODIFIED = "The last modification time of the Azure File";
+
+ public static final String ATTR_NAME_ETAG = "azure.etag";
+ public static final String ATTR_DESCRIPTION_ETAG = "The ETag of the Azure File";
+
+ public static final String ATTR_NAME_FILE_PATH = "azure.filePath";
+ public static final String ATTR_DESCRIPTION_FILE_PATH = "The full path of the Azure File";
+
+ public static final String ATTR_NAME_PRIMARY_URI = "azure.primaryUri";
+ public static final String ATTR_DESCRIPTION_PRIMARY_URI = "Primary location for file content";
+
+}
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/ADLSFileInfo.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/ADLSFileInfo.java
new file mode 100644
index 0000000..7712381
--- /dev/null
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/ADLSFileInfo.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure.storage.utils;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.processor.util.list.ListableEntity;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+public class ADLSFileInfo implements Comparable<ADLSFileInfo>, Serializable, ListableEntity {
+
+ private static final RecordSchema SCHEMA;
+ private static final String FILESYSTEM = "filesystem";
+ private static final String FILE_PATH = "filePath";
+ private static final String DIRECTORY = "directory";
+ private static final String FILENAME = "filename";
+ private static final String LENGTH = "length";
+ private static final String LAST_MODIFIED = "lastModified";
+ private static final String ETAG = "etag";
+
+ static {
+ List<RecordField> recordFields = new ArrayList<>();
+ recordFields.add(new RecordField(FILESYSTEM, RecordFieldType.STRING.getDataType(), false));
+ recordFields.add(new RecordField(FILE_PATH, RecordFieldType.STRING.getDataType(), false));
+ recordFields.add(new RecordField(DIRECTORY, RecordFieldType.STRING.getDataType(), false));
+ recordFields.add(new RecordField(FILENAME, RecordFieldType.STRING.getDataType(), false));
+ recordFields.add(new RecordField(LENGTH, RecordFieldType.LONG.getDataType(), false));
+ recordFields.add(new RecordField(LAST_MODIFIED, RecordFieldType.TIMESTAMP.getDataType(), false));
+ recordFields.add(new RecordField(ETAG, RecordFieldType.STRING.getDataType()));
+ SCHEMA = new SimpleRecordSchema(recordFields);
+ }
+
+ private static final Comparator<ADLSFileInfo> COMPARATOR = Comparator.comparing(ADLSFileInfo::getFileSystem).thenComparing(ADLSFileInfo::getFilePath);
+
+ private final String fileSystem;
+ private final String filePath;
+ private final long length;
+ private final long lastModified;
+ private final String etag;
+
+ private ADLSFileInfo(Builder builder) {
+ this.fileSystem = builder.fileSystem;
+ this.filePath = builder.filePath;
+ this.length = builder.length;
+ this.lastModified = builder.lastModified;
+ this.etag = builder.etag;
+ }
+
+ public String getFileSystem() {
+ return fileSystem;
+ }
+
+ public String getFilePath() {
+ return filePath;
+ }
+
+ public long getLength() {
+ return length;
+ }
+
+ public long getLastModified() {
+ return lastModified;
+ }
+
+ public String getEtag() {
+ return etag;
+ }
+
+ public String getDirectory() {
+ return filePath.contains("/") ? StringUtils.substringBeforeLast(filePath, "/") : "";
+ }
+
+ public String getFilename() {
+ return filePath.contains("/") ? StringUtils.substringAfterLast(filePath, "/") : filePath;
+ }
+
+ @Override
+ public String getName() {
+ return getFilePath();
+ }
+
+ @Override
+ public String getIdentifier() {
+ return getFilePath();
+ }
+
+ @Override
+ public long getTimestamp() {
+ return getLastModified();
+ }
+
+ @Override
+ public long getSize() {
+ return getLength();
+ }
+
+ public static RecordSchema getRecordSchema() {
+ return SCHEMA;
+ }
+
+ @Override
+ public Record toRecord() {
+ Map<String, Object> values = new HashMap<>();
+ values.put(FILESYSTEM, getFileSystem());
+ values.put(FILE_PATH, getFilePath());
+ values.put(DIRECTORY, getDirectory());
+ values.put(FILENAME, getFilename());
+ values.put(LENGTH, getLength());
+ values.put(LAST_MODIFIED, getLastModified());
+ values.put(ETAG, getEtag());
+ return new MapRecord(SCHEMA, values);
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (this == other) {
+ return true;
+ }
+ if (other == null || getClass() != other.getClass()) {
+ return false;
+ }
+ ADLSFileInfo otherFileInfo = (ADLSFileInfo) other;
+ return Objects.equals(fileSystem, otherFileInfo.fileSystem)
+ && Objects.equals(filePath, otherFileInfo.filePath);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(fileSystem, filePath);
+ }
+
+ @Override
+ public int compareTo(ADLSFileInfo other) {
+ return COMPARATOR.compare(this, other);
+ }
+
+ public static class Builder {
+ private String fileSystem;
+ private String filePath;
+ private long length;
+ private long lastModified;
+ private String etag;
+
+ public Builder fileSystem(String fileSystem) {
+ this.fileSystem = fileSystem;
+ return this;
+ }
+
+ public Builder filePath(String filePath) {
+ this.filePath = filePath;
+ return this;
+ }
+
+ public Builder length(long length) {
+ this.length = length;
+ return this;
+ }
+
+ public Builder lastModified(long lastModified) {
+ this.lastModified = lastModified;
+ return this;
+ }
+
+ public Builder etag(String etag) {
+ this.etag = etag;
+ return this;
+ }
+
+ public ADLSFileInfo build() {
+ return new ADLSFileInfo(this);
+ }
+ }
+}
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index 6e09330..015596e 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -23,4 +23,5 @@ org.apache.nifi.processors.azure.storage.queue.PutAzureQueueStorage
org.apache.nifi.processors.azure.storage.queue.GetAzureQueueStorage
org.apache.nifi.processors.azure.storage.PutAzureDataLakeStorage
org.apache.nifi.processors.azure.storage.DeleteAzureDataLakeStorage
-org.apache.nifi.processors.azure.storage.FetchAzureDataLakeStorage
\ No newline at end of file
+org.apache.nifi.processors.azure.storage.FetchAzureDataLakeStorage
+org.apache.nifi.processors.azure.storage.ListAzureDataLakeStorage
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AbstractAzureDataLakeStorageIT.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AbstractAzureDataLakeStorageIT.java
index da03eae..553621b 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AbstractAzureDataLakeStorageIT.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AbstractAzureDataLakeStorageIT.java
@@ -22,6 +22,7 @@ import com.azure.storage.file.datalake.DataLakeFileClient;
import com.azure.storage.file.datalake.DataLakeFileSystemClient;
import com.azure.storage.file.datalake.DataLakeServiceClient;
import com.azure.storage.file.datalake.DataLakeServiceClientBuilder;
+import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor;
import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
import org.apache.nifi.services.azure.storage.ADLSCredentialsControllerService;
@@ -35,6 +36,7 @@ import java.util.UUID;
public abstract class AbstractAzureDataLakeStorageIT extends AbstractAzureStorageIT {
private static final String FILESYSTEM_NAME_PREFIX = "nifi-test-filesystem";
+ private static final String TEST_FILE_CONTENT = "test";
protected String fileSystemName;
protected DataLakeFileSystemClient fileSystemClient;
@@ -72,6 +74,10 @@ public abstract class AbstractAzureDataLakeStorageIT extends AbstractAzureStorag
.buildClient();
}
+ protected void createDirectory(String directory) {
+ fileSystemClient.createDirectory(directory);
+ }
+
protected void uploadFile(String directory, String filename, String fileContent) {
byte[] fileContentBytes = fileContent.getBytes();
@@ -82,9 +88,49 @@ public abstract class AbstractAzureDataLakeStorageIT extends AbstractAzureStorag
fileClient.flush(fileContentBytes.length);
}
+ protected void uploadFile(TestFile testFile) {
+ uploadFile(testFile.getDirectory(), testFile.getFilename(), testFile.getFileContent());
+ }
+
protected void createDirectoryAndUploadFile(String directory, String filename, String fileContent) {
- fileSystemClient.createDirectory(directory);
+ createDirectory(directory);
uploadFile(directory, filename, fileContent);
}
+
+ protected void createDirectoryAndUploadFile(TestFile testFile) {
+ createDirectoryAndUploadFile(testFile.getDirectory(), testFile.getFilename(), testFile.getFileContent());
+ }
+
+ protected static class TestFile {
+ private final String directory;
+ private final String filename;
+ private final String fileContent;
+
+ public TestFile(String directory, String filename, String fileContent) {
+ this.directory = directory;
+ this.filename = filename;
+ this.fileContent = fileContent;
+ }
+
+ public TestFile(String directory, String filename) {
+ this(directory, filename, TEST_FILE_CONTENT);
+ }
+
+ public String getDirectory() {
+ return directory;
+ }
+
+ public String getFilename() {
+ return filename;
+ }
+
+ public String getFileContent() {
+ return fileContent;
+ }
+
+ public String getFilePath() {
+ return StringUtils.isNotBlank(directory) ? String.format("%s/%s", directory, filename) : filename;
+ }
+ }
}
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITListAzureDataLakeStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITListAzureDataLakeStorage.java
new file mode 100644
index 0000000..fd308bc
--- /dev/null
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITListAzureDataLakeStorage.java
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure.storage;
+
+import org.apache.nifi.processor.Processor;
+import org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.serialization.record.MockRecordWriter;
+import org.apache.nifi.util.MockFlowFile;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_DIRECTORY;
+import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_ETAG;
+import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_FILENAME;
+import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_FILESYSTEM;
+import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_FILE_PATH;
+import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_LAST_MODIFIED;
+import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_LENGTH;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+
+public class ITListAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT {
+
+ private Map<String, TestFile> testFiles;
+
+ @Override
+ protected Class<? extends Processor> getProcessorClass() {
+ return ListAzureDataLakeStorage.class;
+ }
+
+ @Before
+ public void setUp() {
+ testFiles = new HashMap<>();
+
+ TestFile testFile1 = new TestFile("", "file1");
+ uploadFile(testFile1);
+ testFiles.put(testFile1.getFilePath(), testFile1);
+
+ TestFile testFile2 = new TestFile("", "file2");
+ uploadFile(testFile2);
+ testFiles.put(testFile2.getFilePath(), testFile2);
+
+ TestFile testFile11 = new TestFile("dir1", "file11");
+ createDirectoryAndUploadFile(testFile11);
+ testFiles.put(testFile11.getFilePath(), testFile11);
+
+ TestFile testFile12 = new TestFile("dir1", "file12");
+ uploadFile(testFile12);
+ testFiles.put(testFile12.getFilePath(), testFile12);
+
+ TestFile testFile111 = new TestFile("dir1/dir11", "file111");
+ createDirectoryAndUploadFile(testFile111);
+ testFiles.put(testFile111.getFilePath(), testFile111);
+
+ TestFile testFile21 = new TestFile("dir 2", "file 21");
+ createDirectoryAndUploadFile(testFile21);
+ testFiles.put(testFile21.getFilePath(), testFile21);
+
+ createDirectory("dir3");
+ }
+
+ @Test
+ public void testListRootRecursive() throws Exception {
+ runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, "");
+
+ runProcessor();
+
+ assertSuccess("file1", "file2", "dir1/file11", "dir1/file12", "dir1/dir11/file111", "dir 2/file 21");
+ }
+
+ @Test
+ public void testListRootNonRecursive() throws Exception {
+ runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, "");
+ runner.setProperty(ListAzureDataLakeStorage.RECURSE_SUBDIRECTORIES, "false");
+
+ runProcessor();
+
+ assertSuccess("file1", "file2");
+ }
+
+ @Test
+ public void testListSubdirectoryRecursive() throws Exception {
+ runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, "dir1");
+
+ runProcessor();
+
+ assertSuccess("dir1/file11", "dir1/file12", "dir1/dir11/file111");
+ }
+
+ @Test
+ public void testListSubdirectoryNonRecursive() throws Exception {
+ runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, "dir1");
+ runner.setProperty(ListAzureDataLakeStorage.RECURSE_SUBDIRECTORIES, "false");
+
+ runProcessor();
+
+ assertSuccess("dir1/file11", "dir1/file12");
+ }
+
+ @Test
+ public void testListWithFileFilter() throws Exception {
+ runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, "");
+ runner.setProperty(ListAzureDataLakeStorage.FILE_FILTER, "^file1.*$");
+
+ runProcessor();
+
+ assertSuccess("file1", "dir1/file11", "dir1/file12", "dir1/dir11/file111");
+ }
+
+ @Test
+ public void testListWithFileFilterWithEL() throws Exception {
+ runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, "");
+ runner.setProperty(ListAzureDataLakeStorage.FILE_FILTER, "^file${suffix}$");
+ runner.setVariable("suffix", "1.*");
+
+ runProcessor();
+
+ assertSuccess("file1", "dir1/file11", "dir1/file12", "dir1/dir11/file111");
+ }
+
+ @Test
+ public void testListRootWithPathFilter() throws Exception {
+ runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, "");
+ runner.setProperty(ListAzureDataLakeStorage.PATH_FILTER, "^dir1.*$");
+
+ runProcessor();
+
+ assertSuccess("dir1/file11", "dir1/file12", "dir1/dir11/file111");
+ }
+
+ @Test
+ public void testListRootWithPathFilterWithEL() throws Exception {
+ runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, "");
+ runner.setProperty(ListAzureDataLakeStorage.PATH_FILTER, "${prefix}${suffix}");
+ runner.setVariable("prefix", "^dir");
+ runner.setVariable("suffix", "1.*$");
+
+ runProcessor();
+
+ assertSuccess("dir1/file11", "dir1/file12", "dir1/dir11/file111");
+ }
+
+ @Test
+ public void testListSubdirectoryWithPathFilter() throws Exception {
+ runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, "dir1");
+ runner.setProperty(ListAzureDataLakeStorage.PATH_FILTER, "dir1.*");
+
+ runProcessor();
+
+ assertSuccess("dir1/dir11/file111");
+ }
+
+ @Test
+ public void testListRootWithFileAndPathFilter() throws Exception {
+ runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, "");
+ runner.setProperty(ListAzureDataLakeStorage.FILE_FILTER, ".*11");
+ runner.setProperty(ListAzureDataLakeStorage.PATH_FILTER, "dir1.*");
+
+ runProcessor();
+
+ assertSuccess("dir1/file11", "dir1/dir11/file111");
+ }
+
+ @Test
+ public void testListEmptyDirectory() throws Exception {
+ runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, "dir3");
+
+ runProcessor();
+
+ assertSuccess();
+ }
+
+ @Test
+ public void testListNonExistingDirectory() {
+ runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, "dummy");
+
+ runProcessor();
+
+ assertFailure();
+ }
+
+ @Test
+ public void testListWithNonExistingFileSystem() {
+ runner.setProperty(AbstractAzureDataLakeStorageProcessor.FILESYSTEM, "dummy");
+ runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, "");
+
+ runProcessor();
+
+ assertFailure();
+ }
+
+ @Test
+ public void testListWithRecords() throws InitializationException {
+ runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, "dir1");
+
+ MockRecordWriter recordWriter = new MockRecordWriter(null, false);
+ runner.addControllerService("record-writer", recordWriter);
+ runner.enableControllerService(recordWriter);
+ runner.setProperty(ListAzureDataLakeStorage.RECORD_WRITER, "record-writer");
+
+ runner.run();
+
+ runner.assertAllFlowFilesTransferred(ListAzureDataLakeStorage.REL_SUCCESS, 1);
+ MockFlowFile flowFile = runner.getFlowFilesForRelationship(ListAzureDataLakeStorage.REL_SUCCESS).get(0);
+ flowFile.assertAttributeEquals("record.count", "3");
+ }
+
+ private void runProcessor() {
+ runner.assertValid();
+ runner.run();
+ }
+
+ private void assertSuccess(String... testFilePaths) throws Exception {
+ runner.assertTransferCount(ListAzureDataLakeStorage.REL_SUCCESS, testFilePaths.length);
+
+ Map<String, TestFile> expectedFiles = new HashMap<>(testFiles);
+ expectedFiles.keySet().retainAll(Arrays.asList(testFilePaths));
+
+ List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ListAzureDataLakeStorage.REL_SUCCESS);
+
+ for (MockFlowFile flowFile : flowFiles) {
+ String filePath = flowFile.getAttribute("azure.filePath");
+ TestFile testFile = expectedFiles.remove(filePath);
+ assertNotNull("File path not found in the expected map", testFile);
+ assertFlowFile(testFile, flowFile);
+ }
+ }
+
+ private void assertFlowFile(TestFile testFile, MockFlowFile flowFile) throws Exception {
+ flowFile.assertAttributeEquals(ATTR_NAME_FILESYSTEM, fileSystemName);
+ flowFile.assertAttributeEquals(ATTR_NAME_FILE_PATH, testFile.getFilePath());
+ flowFile.assertAttributeEquals(ATTR_NAME_DIRECTORY, testFile.getDirectory());
+ flowFile.assertAttributeEquals(ATTR_NAME_FILENAME, testFile.getFilename());
+ flowFile.assertAttributeEquals(ATTR_NAME_LENGTH, String.valueOf(testFile.getFileContent().length()));
+
+ flowFile.assertAttributeExists(ATTR_NAME_LAST_MODIFIED);
+ flowFile.assertAttributeExists(ATTR_NAME_ETAG);
+
+ flowFile.assertContentEquals("");
+ }
+
+ private void assertFailure() {
+ assertFalse(runner.getLogger().getErrorMessages().isEmpty());
+ runner.assertTransferCount(ListAzureDataLakeStorage.REL_SUCCESS, 0);
+ }
+
+}
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureDataLakeStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureDataLakeStorage.java
index 049031e..324e20d 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureDataLakeStorage.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureDataLakeStorage.java
@@ -35,6 +35,11 @@ import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
+import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_DIRECTORY;
+import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_FILENAME;
+import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_FILESYSTEM;
+import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_LENGTH;
+import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_PRIMARY_URI;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@@ -149,15 +154,6 @@ public class ITPutAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT {
}
@Test
- public void testPutFileWithInvalidDirectory() {
- runner.setProperty(PutAzureDataLakeStorage.DIRECTORY, "/dir1");
-
- runProcessor(FILE_DATA);
-
- assertFailure();
- }
-
- @Test
public void testPutFileWithInvalidFileName() {
runner.setProperty(PutAzureDataLakeStorage.FILE, "/file1");
@@ -282,18 +278,18 @@ public class ITPutAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT {
private void assertFlowFile(byte[] fileData, String fileName, String directory) throws Exception {
MockFlowFile flowFile = assertFlowFile(fileData);
- flowFile.assertAttributeEquals("azure.filesystem", fileSystemName);
- flowFile.assertAttributeEquals("azure.directory", directory);
- flowFile.assertAttributeEquals("azure.filename", fileName);
+ flowFile.assertAttributeEquals(ATTR_NAME_FILESYSTEM, fileSystemName);
+ flowFile.assertAttributeEquals(ATTR_NAME_DIRECTORY, directory);
+ flowFile.assertAttributeEquals(ATTR_NAME_FILENAME, fileName);
String urlEscapedDirectory = UrlEscapers.urlPathSegmentEscaper().escape(directory);
String urlEscapedFileName = UrlEscapers.urlPathSegmentEscaper().escape(fileName);
String primaryUri = StringUtils.isNotEmpty(directory)
? String.format("https://%s.dfs.core.windows.net/%s/%s/%s", getAccountName(), fileSystemName, urlEscapedDirectory, urlEscapedFileName)
: String.format("https://%s.dfs.core.windows.net/%s/%s", getAccountName(), fileSystemName, urlEscapedFileName);
- flowFile.assertAttributeEquals("azure.primaryUri", primaryUri);
+ flowFile.assertAttributeEquals(ATTR_NAME_PRIMARY_URI, primaryUri);
- flowFile.assertAttributeEquals("azure.length", Integer.toString(fileData.length));
+ flowFile.assertAttributeEquals(ATTR_NAME_LENGTH, Integer.toString(fileData.length));
}
private MockFlowFile assertFlowFile(byte[] fileData) throws Exception {
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/TestAbstractAzureDataLakeStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/TestAbstractAzureDataLakeStorage.java
index 94fc576..a4a4c64 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/TestAbstractAzureDataLakeStorage.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/TestAbstractAzureDataLakeStorage.java
@@ -47,7 +47,11 @@ public class TestAbstractAzureDataLakeStorage {
runner.setProperty(DIRECTORY, "directory");
runner.setProperty(FILE, "file");
runner.setProperty(ADLS_CREDENTIALS_SERVICE, "credentials_service");
+ }
+ @Test
+ public void testValid() {
+ runner.assertValid();
}
@Test
@@ -80,6 +84,27 @@ public class TestAbstractAzureDataLakeStorage {
}
@Test
+ public void testNotValidWhenDirectoryIsSlash() {
+ runner.setProperty(DIRECTORY, "/");
+
+ runner.assertNotValid();
+ }
+
+ @Test
+ public void testNotValidWhenDirectoryStartsWithSlash() {
+ runner.setProperty(DIRECTORY, "/directory");
+
+ runner.assertNotValid();
+ }
+
+ @Test
+ public void testNotValidWhenDirectoryIsWhitespaceOnly() {
+ runner.setProperty(DIRECTORY, " ");
+
+ runner.assertNotValid();
+ }
+
+ @Test
public void testValidWhenNoFileSpecified() {
// the default value will be used
runner.removeProperty(FILE);