You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by tu...@apache.org on 2020/05/13 13:03:04 UTC

[nifi] branch master updated: NIFI-7367: Add tests for FetchAzureDataLakeStorage

This is an automated email from the ASF dual-hosted git repository.

turcsanyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/master by this push:
     new 9608fe3  NIFI-7367: Add tests for FetchAzureDataLakeStorage
9608fe3 is described below

commit 9608fe33fa4a3f39ee6ddca54184ccc85ef1a163
Author: Peter Gyori <pe...@gmail.com>
AuthorDate: Wed May 6 18:28:47 2020 +0200

    NIFI-7367: Add tests for FetchAzureDataLakeStorage
    
    NIFI-7367: Negative test cases for expression language in FetchAzureDataLakeStorage
    FetchAzureDataLakeStorage throws exception when filesystem or filename is blank.
    Fixed logged error messages in all 3 of the Delete, Fetch and Put ADLS processors.
    testFetchDirectory test case marked as ignored.
    
    This closes #4257.
    
    Signed-off-by: Peter Turcsanyi <tu...@apache.org>
---
 .../azure/storage/DeleteAzureDataLakeStorage.java  |   2 +-
 .../azure/storage/FetchAzureDataLakeStorage.java   |  29 +-
 .../azure/storage/PutAzureDataLakeStorage.java     |   2 +-
 .../storage/AbstractAzureDataLakeStorageIT.java    |  19 +
 .../azure/storage/ITFetchAzureDataLakeStorage.java | 413 ++++++++++++++++++++-
 .../azure/storage/ITPutAzureDataLakeStorage.java   |  15 +
 6 files changed, 450 insertions(+), 30 deletions(-)

diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/DeleteAzureDataLakeStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/DeleteAzureDataLakeStorage.java
index bf29087..86b23aa 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/DeleteAzureDataLakeStorage.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/DeleteAzureDataLakeStorage.java
@@ -64,7 +64,7 @@ public class DeleteAzureDataLakeStorage extends AbstractAzureDataLakeStorageProc
             final long transferMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
             session.getProvenanceReporter().invokeRemoteProcess(flowFile, fileClient.getFileUrl(), "File deleted");
         } catch (Exception e) {
-            getLogger().error("Failed to delete the specified file from Azure Data Lake Storage,  due to {}", e);
+            getLogger().error("Failed to delete the specified file from Azure Data Lake Storage", e);
             flowFile = session.penalize(flowFile);
             session.transfer(flowFile, REL_FAILURE);
         }
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureDataLakeStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureDataLakeStorage.java
index 1d0e8d4..a616439 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureDataLakeStorage.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureDataLakeStorage.java
@@ -16,8 +16,11 @@
  */
 package org.apache.nifi.processors.azure.storage;
 
-import java.util.concurrent.TimeUnit;
-
+import com.azure.storage.file.datalake.DataLakeDirectoryClient;
+import com.azure.storage.file.datalake.DataLakeFileClient;
+import com.azure.storage.file.datalake.DataLakeFileSystemClient;
+import com.azure.storage.file.datalake.DataLakeServiceClient;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
@@ -29,11 +32,7 @@ import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor;
 
-import com.azure.storage.file.datalake.DataLakeDirectoryClient;
-import com.azure.storage.file.datalake.DataLakeFileSystemClient;
-import com.azure.storage.file.datalake.DataLakeServiceClient;
-import com.azure.storage.file.datalake.DataLakeFileClient;
-
+import java.util.concurrent.TimeUnit;
 
 @Tags({"azure", "microsoft", "cloud", "storage", "adlsgen2", "datalake"})
 @SeeAlso({PutAzureDataLakeStorage.class, DeleteAzureDataLakeStorage.class})
@@ -44,17 +43,25 @@ public class FetchAzureDataLakeStorage extends AbstractAzureDataLakeStorageProce
     @Override
     public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
         FlowFile flowFile = session.get();
-
         if (flowFile == null) {
             return;
         }
 
         final long startNanos = System.nanoTime();
-
         try {
             final String fileSystem = context.getProperty(FILESYSTEM).evaluateAttributeExpressions(flowFile).getValue();
             final String directory = context.getProperty(DIRECTORY).evaluateAttributeExpressions(flowFile).getValue();
             final String fileName = context.getProperty(FILE).evaluateAttributeExpressions(flowFile).getValue();
+
+            if (StringUtils.isBlank(fileSystem)) {
+                throw new ProcessException(FILESYSTEM.getDisplayName() + " property evaluated to empty string. " +
+                        FILESYSTEM.getDisplayName() + " must be specified as a non-empty string.");
+            }
+            if (StringUtils.isBlank(fileName)) {
+                throw new ProcessException(FILE.getDisplayName() + " property evaluated to empty string. " +
+                        FILE.getDisplayName() + " must be specified as a non-empty string.");
+            }
+
             final DataLakeServiceClient storageClient = getStorageClient(context, flowFile);
             final DataLakeFileSystemClient dataLakeFileSystemClient = storageClient.getFileSystemClient(fileSystem);
             final DataLakeDirectoryClient directoryClient = dataLakeFileSystemClient.getDirectoryClient(directory);
@@ -67,9 +74,9 @@ public class FetchAzureDataLakeStorage extends AbstractAzureDataLakeStorageProce
             final long transferMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
             session.getProvenanceReporter().fetch(flowFile, fileClient.getFileUrl(), transferMillis);
         } catch (Exception e) {
-            getLogger().error("Failure to fetch file from Azure Data Lake Storage, due to {}", e);
+            getLogger().error("Failure to fetch file from Azure Data Lake Storage", e);
             flowFile = session.penalize(flowFile);
             session.transfer(flowFile, REL_FAILURE);
         }
     }
-}
+}
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureDataLakeStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureDataLakeStorage.java
index 94fa90f..7d395a3 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureDataLakeStorage.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureDataLakeStorage.java
@@ -98,7 +98,7 @@ public class PutAzureDataLakeStorage extends AbstractAzureDataLakeStorageProcess
             final long transferMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
             session.getProvenanceReporter().send(flowFile, fileClient.getFileUrl(), transferMillis);
         } catch (Exception e) {
-            getLogger().error("Failed to create file, due to {}", e);
+            getLogger().error("Failed to create file", e);
             flowFile = session.penalize(flowFile);
             session.transfer(flowFile, REL_FAILURE);
         }
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AbstractAzureDataLakeStorageIT.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AbstractAzureDataLakeStorageIT.java
index a9b2bd9..e0b591a 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AbstractAzureDataLakeStorageIT.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AbstractAzureDataLakeStorageIT.java
@@ -17,6 +17,8 @@
 package org.apache.nifi.processors.azure.storage;
 
 import com.azure.storage.common.StorageSharedKeyCredential;
+import com.azure.storage.file.datalake.DataLakeDirectoryClient;
+import com.azure.storage.file.datalake.DataLakeFileClient;
 import com.azure.storage.file.datalake.DataLakeFileSystemClient;
 import com.azure.storage.file.datalake.DataLakeServiceClient;
 import com.azure.storage.file.datalake.DataLakeServiceClientBuilder;
@@ -24,6 +26,7 @@ import org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor;
 import org.junit.After;
 import org.junit.Before;
 
+import java.io.ByteArrayInputStream;
 import java.util.UUID;
 
 public abstract class AbstractAzureDataLakeStorageIT extends AbstractAzureStorageIT {
@@ -54,4 +57,20 @@ public abstract class AbstractAzureDataLakeStorageIT extends AbstractAzureStorag
                 .credential(new StorageSharedKeyCredential(getAccountName(), getAccountKey()))
                 .buildClient();
     }
+
+    protected void uploadFile(String directory, String filename, String fileContent) {
+        byte[] fileContentBytes = fileContent.getBytes();
+
+        DataLakeDirectoryClient directoryClient = fileSystemClient.getDirectoryClient(directory);
+        DataLakeFileClient fileClient = directoryClient.createFile(filename);
+
+        fileClient.append(new ByteArrayInputStream(fileContentBytes), 0, fileContentBytes.length);
+        fileClient.flush(fileContentBytes.length);
+    }
+
+    protected void createDirectoryAndUploadFile(String directory, String filename, String fileContent) {
+        fileSystemClient.createDirectory(directory);
+
+        uploadFile(directory, filename, fileContent);
+    }
 }
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITFetchAzureDataLakeStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITFetchAzureDataLakeStorage.java
index a242ede..f08b75e 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITFetchAzureDataLakeStorage.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITFetchAzureDataLakeStorage.java
@@ -16,43 +16,422 @@
  */
 package org.apache.nifi.processors.azure.storage;
 
+import com.azure.storage.file.datalake.models.DataLakeStorageException;
+import com.google.common.collect.Sets;
 import org.apache.nifi.processor.Processor;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.ProvenanceEventType;
 import org.apache.nifi.util.MockFlowFile;
-import org.junit.Before;
 import org.junit.Ignore;
 import org.junit.Test;
 
-import java.util.List;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
 
-public class ITFetchAzureDataLakeStorage extends AbstractAzureBlobStorageIT {
+import static org.junit.Assert.assertEquals;
+
+public class ITFetchAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT {
 
     @Override
     protected Class<? extends Processor> getProcessorClass() {
         return FetchAzureDataLakeStorage.class;
     }
 
-    @Before
-    public void setUp() throws Exception {
-        runner.setProperty(PutAzureDataLakeStorage.FILE, TEST_FILE_NAME);
+    @Test
+    public void testFetchFileFromDirectory() {
+        // GIVEN
+        String directory = "TestDirectory";
+        String filename = "testFile.txt";
+        String fileContent = "AzureFileContent";
+        String inputFlowFileContent = "InputFlowFileContent";
+
+        createDirectoryAndUploadFile(directory, filename, fileContent);
+
+        // WHEN
+        // THEN
+        testSuccessfulFetch(fileSystemName, directory, filename, inputFlowFileContent, fileContent);
+    }
+
+    @Test
+    public void testFetchFileFromRoot() {
+        // GIVEN
+        String directory= "";
+        String filename = "testFile.txt";
+        String fileContent = "AzureFileContent";
+        String inputFlowFileContent = "InputFlowFileContent";
+
+        uploadFile(directory, filename, fileContent);
+
+        // WHEN
+        // THEN
+        testSuccessfulFetch(fileSystemName, directory, filename, inputFlowFileContent, fileContent);
+    }
+
+    @Test
+    public void testFetchFileFromDirectoryWithWhitespace() {
+        // GIVEN
+        String directory= "A Test Directory";
+        String filename = "testFile.txt";
+        String fileContent = "AzureFileContent";
+        String inputFlowFileContent = "InputFlowFileContent";
+
+        createDirectoryAndUploadFile(directory, filename, fileContent);
+
+        // WHEN
+        // THEN
+        testSuccessfulFetch(fileSystemName, directory, filename, inputFlowFileContent, fileContent);
+    }
+
+    @Test
+    public void testFetchFileWithWhitespaceFromDirectory() {
+        // GIVEN
+        String directory= "TestDirectory";
+        String filename = "A test file.txt";
+        String fileContent = "AzureFileContent";
+        String inputFlowFileContent = "InputFlowFileContent";
+
+        createDirectoryAndUploadFile(directory, filename, fileContent);
+
+        // WHEN
+        // THEN
+        testSuccessfulFetch(fileSystemName, directory, filename, inputFlowFileContent, fileContent);
+    }
+
+    @Test
+    public void testFetchFileCaseSensitiveFilename() {
+        // GIVEN
+        String directory = "TestDirectory";
+        String filename1 = "testFile.txt";
+        String filename2 = "testfile.txt";
+        String fileContent1 = "ContentOfFile1";
+        String fileContent2 = "ContentOfFile2";
+        String inputFlowFileContent = "InputFlowFileContent";
+
+        createDirectoryAndUploadFile(directory, filename1, fileContent1);
+        createDirectoryAndUploadFile(directory, filename2, fileContent2);
+
+        // WHEN
+        // THEN
+        testSuccessfulFetch(fileSystemName, directory, filename1, inputFlowFileContent, fileContent1);
+        runner.clearProvenanceEvents();
+        runner.clearTransferState();
+        testSuccessfulFetch(fileSystemName, directory, filename2, inputFlowFileContent, fileContent2);
+    }
+
+    @Test
+    public void testFetchFileCaseSensitiveDirectoryName() {
+        // GIVEN
+        String directory1 = "TestDirectory";
+        String directory2 = "Testdirectory";
+        String filename1 = "testFile1.txt";
+        String filename2 = "testFile2.txt";
+        String fileContent1 = "ContentOfFile1";
+        String fileContent2 = "ContentOfFile2";
+        String inputFlowFileContent = "InputFlowFileContent";
+
+        createDirectoryAndUploadFile(directory1, filename1, fileContent1);
+        createDirectoryAndUploadFile(directory2, filename2, fileContent2);
+
+        // WHEN
+        // THEN
+        testSuccessfulFetch(fileSystemName, directory1, filename1, inputFlowFileContent, fileContent1);
+        runner.clearProvenanceEvents();
+        runner.clearTransferState();
+        testSuccessfulFetch(fileSystemName, directory2, filename2, inputFlowFileContent, fileContent2);
+    }
+
+    @Test
+    public void testFetchFileFromDeepDirectoryStructure() {
+        // GIVEN
+        String directory= "Directory01/Directory02/Directory03/Directory04/Directory05/Directory06/Directory07/"
+                + "Directory08/Directory09/Directory10/Directory11/Directory12/Directory13/Directory14/Directory15/"
+                + "Directory16/Directory17/Directory18/Directory19/Directory20/TestDirectory";
+        String filename = "testFile.txt";
+        String fileContent = "AzureFileContent";
+        String inputFlowFileContent = "InputFlowFileContent";
+
+        createDirectoryAndUploadFile(directory, filename, fileContent);
+
+        // WHEN
+        // THEN
+        testSuccessfulFetch(fileSystemName, directory, filename, inputFlowFileContent, fileContent);
+    }
+
+    @Ignore("Fetching a directory currently produces an empty flowfile. This will change in the future, and this test case will need to be modified.")
+    @Test
+    public void testFetchDirectory() {
+        // GIVEN
+        String parentDirectory = "ParentDirectory";
+        String childDirectory = "ChildDirectory";
+        String filename = "testFile.txt";
+        String fileContent = "AzureFileContent";
+        String inputFlowFileContent = "InputFlowFileContent";
+        String expectedFlowFileContent = "";
+
+        createDirectoryAndUploadFile(parentDirectory + "/" + childDirectory, filename, fileContent);
+
+        // WHEN
+        // THEN
+        testSuccessfulFetch(fileSystemName, parentDirectory, childDirectory, inputFlowFileContent, expectedFlowFileContent);
+    }
+
+    @Test
+    public void testFetchNonExistentFileSystem() {
+        // GIVEN
+        String fileSystem = "NonExistentFileSystem";
+        String directory = "TestDirectory";
+        String filename = "testFile.txt";
+        String inputFlowFileContent = "InputFlowFileContent";
+
+        // WHEN
+        // THEN
+        testFailedFetch(fileSystem, directory, filename, inputFlowFileContent, inputFlowFileContent, 400);
+    }
+
+    @Test
+    public void testFetchNonExistentDirectory() {
+        // GIVEN
+        String directory = "TestDirectory";
+        String filename = "testFile.txt";
+        String inputFlowFileContent = "InputFlowFileContent";
+
+        // WHEN
+        // THEN
+        testFailedFetch(fileSystemName, directory, filename, inputFlowFileContent, inputFlowFileContent, 404);
+    }
+
+    @Test
+    public void testFetchNonExistentFile() {
+        // GIVEN
+        String directory = "TestDirectory";
+        String filename = "testFile.txt";
+        String inputFlowFileContent = "InputFlowFileContent";
+
+        fileSystemClient.createDirectory(directory);
+
+        // WHEN
+        // THEN
+        testFailedFetch(fileSystemName, directory, filename, inputFlowFileContent, inputFlowFileContent, 404);
+    }
+
+    @Ignore("Takes some time, only recommended for manual testing.")
+    @Test
+    public void testFetchLargeFile() {
+        // GIVEN
+        String directory = "TestDirectory";
+        String filename = "testFile.txt";
+        byte[] fileContentBytes = new byte[100_000_000];
+        String fileContent = new String(fileContentBytes);
+        String inputFlowFileContent = "InputFlowFileContent";
+
+        createDirectoryAndUploadFile(directory, filename, fileContent);
+
+        // WHEN
+        // THEN
+        testSuccessfulFetch(fileSystemName, directory, filename, inputFlowFileContent, fileContent);
+    }
+
+    @Test
+    public void testFetchInvalidDirectoryName() {
+        // GIVEN
+        String directory = "TestDirectory";
+        String invalidDirectoryName = "Test/\\Directory";
+        String filename = "testFile.txt";
+        String fileContent = "AzureFileContent";
+        String inputFlowFileContent = "InputFlowFileContent";
+
+        createDirectoryAndUploadFile(directory, filename, fileContent);
+
+        // WHEN
+        // THEN
+        testFailedFetch(fileSystemName, invalidDirectoryName, filename, inputFlowFileContent, inputFlowFileContent, 404);
+    }
+
+    @Test
+    public void testFetchInvalidFilename() {
+        // GIVEN
+        String directory = "TestDirectory";
+        String filename = "testFile.txt";
+        String invalidFilename = "test/\\File.txt";
+        String fileContent = "AzureFileContent";
+        String inputFlowFileContent = "InputFlowFileContent";
+
+        createDirectoryAndUploadFile(directory, filename, fileContent);
+
+        // WHEN
+        // THEN
+        testFailedFetch(fileSystemName, directory, invalidFilename, inputFlowFileContent, inputFlowFileContent, 404);
+    }
+
+    @Test
+    public void testFetchUsingExpressionLanguage() {
+        // GIVEN
+        String expLangFileSystem = "az.filesystem";
+        String expLangDirectory = "az.directory";
+        String expLangFilename = "az.filename";
+
+        String directory = "TestDirectory";
+        String filename = "testFile.txt";
+        String fileContent = "AzureFileContent";
+
+        String inputFlowFileContent = "InputFlowFileContent";
+
+        Map<String, String> attributes = new HashMap<>();
+        attributes.put(expLangFileSystem, fileSystemName);
+        attributes.put(expLangDirectory, directory);
+        attributes.put(expLangFilename, filename);
+
+        createDirectoryAndUploadFile(directory, filename, fileContent);
+
+        // WHEN
+        // THEN
+        testSuccessfulFetch("${" + expLangFileSystem + "}",
+                    "${" + expLangDirectory + "}",
+                    "${" + expLangFilename + "}",
+                            attributes,
+                            inputFlowFileContent,
+                            fileContent);
+    }
+
+    @Test
+    public void testFetchUsingExpressionLanguageFileSystemIsNotSpecified() {
+        // GIVEN
+        String expLangFileSystem = "az.filesystem";
+        String expLangDirectory = "az.directory";
+        String expLangFilename = "az.filename";
+
+        String directory = "TestDirectory";
+        String filename = "testFile.txt";
+        String fileContent = "AzureFileContent";
+
+        String inputFlowFileContent = "InputFlowFileContent";
+
+        Map<String, String> attributes = new HashMap<>();
+        attributes.put(expLangDirectory, directory);
+        attributes.put(expLangFilename, filename);
 
+        createDirectoryAndUploadFile(directory, filename, fileContent);
+
+        // WHEN
+        // THEN
+        testFailedFetchWithProcessException("${" + expLangFileSystem + "}",
+                "${" + expLangDirectory + "}",
+                "${" + expLangFilename + "}",
+                attributes,
+                inputFlowFileContent,
+                inputFlowFileContent);
     }
 
-    @Ignore
     @Test
-    public void testFetchFile() throws Exception {
+    public void testFetchUsingExpressionLanguageFilenameIsNotSpecified() {
+        // GIVEN
+        String expLangFileSystem = "az.filesystem";
+        String expLangDirectory = "az.directory";
+        String expLangFilename = "az.filename";
+
+        String directory = "TestDirectory";
+        String filename = "testFile.txt";
+        String fileContent = "AzureFileContent";
+
+        String inputFlowFileContent = "InputFlowFileContent";
+
+        Map<String, String> attributes = new HashMap<>();
+        attributes.put(expLangFileSystem, fileSystemName);
+        attributes.put(expLangDirectory, directory);
+
+        createDirectoryAndUploadFile(directory, filename, fileContent);
+
+        // WHEN
+        // THEN
+        testFailedFetchWithProcessException("${" + expLangFileSystem + "}",
+                "${" + expLangDirectory + "}",
+                "${" + expLangFilename + "}",
+                attributes,
+                inputFlowFileContent,
+                inputFlowFileContent);
+    }
+
+    private void testSuccessfulFetch(String fileSystem, String directory, String filename, String inputFlowFileContent, String expectedFlowFileContent) {
+        testSuccessfulFetch(fileSystem, directory, filename, Collections.emptyMap(), inputFlowFileContent, expectedFlowFileContent);
+    }
+
+    private void testSuccessfulFetch(String fileSystem, String directory, String filename, Map<String, String> attributes, String inputFlowFileContent, String expectedFlowFileContent) {
+        // GIVEN
+        Set<ProvenanceEventType> expectedEventTypes = Sets.newHashSet(ProvenanceEventType.CONTENT_MODIFIED, ProvenanceEventType.FETCH);
+
+        setRunnerProperties(fileSystem, directory, filename);
+
+        // WHEN
+        startRunner(inputFlowFileContent, attributes);
+
+        // THEN
+        assertSuccess(expectedFlowFileContent, expectedEventTypes);
+    }
+
+    private void testFailedFetch(String fileSystem, String directory, String filename, String inputFlowFileContent, String expectedFlowFileContent, int expectedErrorCode) {
+        testFailedFetch(fileSystem, directory, filename, Collections.emptyMap(), inputFlowFileContent, expectedFlowFileContent, expectedErrorCode);
+    }
+
+    private void testFailedFetch(String fileSystem, String directory, String filename, Map<String, String> attributes,
+                                 String inputFlowFileContent, String expectedFlowFileContent, int expectedErrorCode) {
+        // GIVEN
+        setRunnerProperties(fileSystem, directory, filename);
+
+        // WHEN
+        startRunner(inputFlowFileContent, attributes);
+
+        // THEN
+        DataLakeStorageException e = (DataLakeStorageException)runner.getLogger().getErrorMessages().get(0).getThrowable();
+        assertEquals(expectedErrorCode, e.getStatusCode());
+
+        assertFailure(expectedFlowFileContent);
+    }
+
+    private void testFailedFetchWithProcessException(String fileSystem, String directory, String filename, Map<String, String> attributes,
+                                                     String inputFlowFileContent, String expectedFlowFileContent) {
+        // GIVEN
+        setRunnerProperties(fileSystem, directory, filename);
+
+        // WHEN
+        startRunner(inputFlowFileContent, attributes);
+
+        // THEN
+        Throwable exception = runner.getLogger().getErrorMessages().get(0).getThrowable();
+        assertEquals(ProcessException.class, exception.getClass());
+
+        assertFailure(expectedFlowFileContent);
+    }
+
+    private void setRunnerProperties(String fileSystem, String directory, String filename) {
+        runner.setProperty(FetchAzureDataLakeStorage.FILESYSTEM, fileSystem);
+        runner.setProperty(FetchAzureDataLakeStorage.DIRECTORY, directory);
+        runner.setProperty(FetchAzureDataLakeStorage.FILE, filename);
         runner.assertValid();
-        runner.enqueue(new byte[0]);
+    }
+
+    private void startRunner(String inputFlowFileContent, Map<String, String> attributes) {
+        runner.enqueue(inputFlowFileContent, attributes);
         runner.run();
+    }
+
+    private void assertSuccess(String expectedFlowFileContent, Set<ProvenanceEventType> expectedEventTypes) {
+        runner.assertAllFlowFilesTransferred(FetchAzureDataLakeStorage.REL_SUCCESS, 1);
+        MockFlowFile flowFile = runner.getFlowFilesForRelationship(FetchAzureDataLakeStorage.REL_SUCCESS).get(0);
+        flowFile.assertContentEquals(expectedFlowFileContent);
 
-        assertResult();
+        Set<ProvenanceEventType> actualEventTypes = runner.getProvenanceEvents().stream()
+                .map(ProvenanceEventRecord::getEventType)
+                .collect(Collectors.toSet());
+        assertEquals(expectedEventTypes, actualEventTypes);
     }
 
-    private void assertResult() throws Exception {
-        runner.assertAllFlowFilesTransferred(PutAzureDataLakeStorage.REL_SUCCESS, 1);
-        List<MockFlowFile> flowFilesForRelationship = runner.getFlowFilesForRelationship(PutAzureDataLakeStorage.REL_SUCCESS);
-        for (MockFlowFile flowFile : flowFilesForRelationship) {
-            flowFile.assertContentEquals("0123456789".getBytes());
-            flowFile.assertAttributeEquals("azure.length", "10");
-        }
+    private void assertFailure(String expectedFlowFileContent) {
+        runner.assertAllFlowFilesTransferred(FetchAzureDataLakeStorage.REL_FAILURE, 1);
+        MockFlowFile flowFile = runner.getFlowFilesForRelationship(FetchAzureDataLakeStorage.REL_FAILURE).get(0);
+        flowFile.assertContentEquals(expectedFlowFileContent);
     }
 }
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureDataLakeStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureDataLakeStorage.java
index 1a6105f..2cf53ec 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureDataLakeStorage.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureDataLakeStorage.java
@@ -18,9 +18,12 @@ package org.apache.nifi.processors.azure.storage;
 
 import com.azure.storage.file.datalake.DataLakeDirectoryClient;
 import com.azure.storage.file.datalake.DataLakeFileClient;
+import com.google.common.collect.Sets;
 import com.google.common.net.UrlEscapers;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.processor.Processor;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.ProvenanceEventType;
 import org.apache.nifi.util.MockFlowFile;
 import org.junit.Before;
 import org.junit.Ignore;
@@ -29,6 +32,8 @@ import org.junit.Test;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
@@ -224,6 +229,7 @@ public class ITPutAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT {
     private void assertSuccess(String directory, String fileName, byte[] fileData) throws Exception {
         assertFlowFile(directory, fileName, fileData);
         assertAzureFile(directory, fileName, fileData);
+        assertProvenanceEvents();
     }
 
     private void assertFlowFile(String directory, String fileName, byte[] fileData) throws Exception {
@@ -262,6 +268,15 @@ public class ITPutAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT {
         assertEquals(fileData.length, fileClient.getProperties().getFileSize());
     }
 
+    private void assertProvenanceEvents() {
+        Set<ProvenanceEventType> expectedEventTypes = Sets.newHashSet(ProvenanceEventType.SEND);
+
+        Set<ProvenanceEventType> actualEventTypes = runner.getProvenanceEvents().stream()
+                .map(ProvenanceEventRecord::getEventType)
+                .collect(Collectors.toSet());
+        assertEquals(expectedEventTypes, actualEventTypes);
+    }
+
     private void assertFailure() {
         runner.assertAllFlowFilesTransferred(PutAzureDataLakeStorage.REL_FAILURE, 1);
     }