You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by tu...@apache.org on 2020/05/18 21:52:29 UTC

[nifi] branch master updated: NIFI-7446: FetchAzureDataLakeStorage processor now throws exception when the specified path points to a directory

This is an automated email from the ASF dual-hosted git repository.

turcsanyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/master by this push:
     new 9aae58f  NIFI-7446: FetchAzureDataLakeStorage processor now throws exception when the specified path points to a directory
9aae58f is described below

commit 9aae58f1178cacb6ee4814f26288bf8ca5150d71
Author: Peter Gyori <pe...@gmail.com>
AuthorDate: Thu May 14 14:35:34 2020 +0200

    NIFI-7446: FetchAzureDataLakeStorage processor now throws exception when the specified path points to a directory
    
    A newer version (12.1.1) of azure-storage-file-datalake is imported.
    
    This closes #4273.
    
    Signed-off-by: Peter Turcsanyi <tu...@apache.org>
---
 .../nifi-azure-bundle/nifi-azure-processors/pom.xml            |  2 +-
 .../processors/azure/storage/FetchAzureDataLakeStorage.java    |  4 ++++
 .../processors/azure/storage/ITFetchAzureDataLakeStorage.java  | 10 ++++++----
 3 files changed, 11 insertions(+), 5 deletions(-)

diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml
index b0e80df..0a3602d 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml
@@ -73,7 +73,7 @@
         <dependency>
             <groupId>com.azure</groupId>
             <artifactId>azure-storage-file-datalake</artifactId>
-            <version>12.0.1</version>
+            <version>12.1.1</version>
         </dependency>
         <!-- overriding jackson-core in azure-storage -->
         <dependency>
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureDataLakeStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureDataLakeStorage.java
index a616439..2d80ea3 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureDataLakeStorage.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureDataLakeStorage.java
@@ -67,6 +67,10 @@ public class FetchAzureDataLakeStorage extends AbstractAzureDataLakeStorageProce
             final DataLakeDirectoryClient directoryClient = dataLakeFileSystemClient.getDirectoryClient(directory);
             final DataLakeFileClient fileClient = directoryClient.getFileClient(fileName);
 
+            if (fileClient.getProperties().isDirectory()) {
+                throw new ProcessException(FILE.getDisplayName() + " (" + fileName + ") points to a directory. Full path: " + fileClient.getFilePath());
+            }
+
             flowFile = session.write(flowFile, os -> fileClient.read(os));
             session.getProvenanceReporter().modifyContent(flowFile);
             session.transfer(flowFile, REL_SUCCESS);
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITFetchAzureDataLakeStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITFetchAzureDataLakeStorage.java
index f08b75e..de35508 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITFetchAzureDataLakeStorage.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITFetchAzureDataLakeStorage.java
@@ -112,7 +112,7 @@ public class ITFetchAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT
         String inputFlowFileContent = "InputFlowFileContent";
 
         createDirectoryAndUploadFile(directory, filename1, fileContent1);
-        createDirectoryAndUploadFile(directory, filename2, fileContent2);
+        uploadFile(directory, filename2, fileContent2);
 
         // WHEN
         // THEN
@@ -161,7 +161,6 @@ public class ITFetchAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT
         testSuccessfulFetch(fileSystemName, directory, filename, inputFlowFileContent, fileContent);
     }
 
-    @Ignore("Fetching a directory currently produces an empty flowfile. This will change in the future, and this test case will need to be modified.")
     @Test
     public void testFetchDirectory() {
         // GIVEN
@@ -170,13 +169,12 @@ public class ITFetchAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT
         String filename = "testFile.txt";
         String fileContent = "AzureFileContent";
         String inputFlowFileContent = "InputFlowFileContent";
-        String expectedFlowFileContent = "";
 
         createDirectoryAndUploadFile(parentDirectory + "/" + childDirectory, filename, fileContent);
 
         // WHEN
         // THEN
-        testSuccessfulFetch(fileSystemName, parentDirectory, childDirectory, inputFlowFileContent, expectedFlowFileContent);
+        testFailedFetchWithProcessException(fileSystemName, parentDirectory, childDirectory, inputFlowFileContent, inputFlowFileContent);
     }
 
     @Test
@@ -391,6 +389,10 @@ public class ITFetchAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT
         assertFailure(expectedFlowFileContent);
     }
 
+    private void testFailedFetchWithProcessException(String fileSystem, String directory, String filename, String inputFlowFileContent, String expectedFlowFileContent) {
+        testFailedFetchWithProcessException(fileSystem, directory, filename, Collections.emptyMap(), inputFlowFileContent, expectedFlowFileContent);
+    }
+
     private void testFailedFetchWithProcessException(String fileSystem, String directory, String filename, Map<String, String> attributes,
                                                      String inputFlowFileContent, String expectedFlowFileContent) {
         // GIVEN